]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osdc/Objecter.h
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / osdc / Objecter.h
index 7709607ef17349b42332b73495c9a5ec76aac6d8..3dfa1a2d7f70bec4d096974ad315ef7d96c76828 100644 (file)
@@ -25,7 +25,7 @@
 
 #include <boost/thread/shared_mutex.hpp>
 
-#include "include/assert.h"
+#include "include/ceph_assert.h"
 #include "include/buffer.h"
 #include "include/types.h"
 #include "include/rados/rados_types.hpp"
 #include "common/admin_socket.h"
 #include "common/ceph_time.h"
 #include "common/ceph_timer.h"
-#include "common/Finisher.h"
+#include "common/config_obs.h"
 #include "common/shunique_lock.h"
 #include "common/zipkin_trace.h"
+#include "common/Finisher.h"
+#include "common/Throttle.h"
 
 #include "messages/MOSDOp.h"
+#include "msg/Dispatcher.h"
 #include "osd/OSDMap.h"
 
-using namespace std;
 
 class Context;
 class Messenger;
@@ -82,7 +84,7 @@ struct ObjectOperation {
   }
 
   void set_last_op_flags(int flags) {
-    assert(!ops.empty());
+    ceph_assert(!ops.empty());
     ops.rbegin()->op.flags = flags;
   }
 
@@ -124,7 +126,7 @@ struct ObjectOperation {
     osd_op.op.xattr.name_len = (name ? strlen(name) : 0);
     osd_op.op.xattr.value_len = data.length();
     if (name)
-      osd_op.indata.append(name);
+      osd_op.indata.append(name, osd_op.op.xattr.name_len);
     osd_op.indata.append(data);
   }
   void add_xattr_cmp(int op, const char *name, uint8_t cmp_op,
@@ -135,7 +137,7 @@ struct ObjectOperation {
     osd_op.op.xattr.cmp_op = cmp_op;
     osd_op.op.xattr.cmp_mode = cmp_mode;
     if (name)
-      osd_op.indata.append(name);
+      osd_op.indata.append(name, osd_op.op.xattr.name_len);
     osd_op.indata.append(data);
   }
   void add_call(int op, const char *cname, const char *method,
@@ -160,7 +162,7 @@ struct ObjectOperation {
     OSDOp& osd_op = add_op(op);
     osd_op.op.pgls.count = count;
     osd_op.op.pgls.start_epoch = start_epoch;
-    ::encode(cookie, osd_op.indata);
+    encode(cookie, osd_op.indata);
   }
   void add_pgls_filter(int op, uint64_t count, const bufferlist& filter,
                       collection_list_handle_t cookie, epoch_t start_epoch) {
@@ -169,10 +171,10 @@ struct ObjectOperation {
     osd_op.op.pgls.start_epoch = start_epoch;
     string cname = "pg";
     string mname = "filter";
-    ::encode(cname, osd_op.indata);
-    ::encode(mname, osd_op.indata);
+    encode(cname, osd_op.indata);
+    encode(mname, osd_op.indata);
     osd_op.indata.append(filter);
-    ::encode(cookie, osd_op.indata);
+    encode(cookie, osd_op.indata);
   }
   void add_alloc_hint(int op, uint64_t expected_object_size,
                       uint64_t expected_write_size,
@@ -234,12 +236,12 @@ struct ObjectOperation {
       : psize(ps), pmtime(pm), ptime(pt), pts(_pts), prval(prval) {}
     void finish(int r) override {
       if (r >= 0) {
-       bufferlist::iterator p = bl.begin();
+       auto p = bl.cbegin();
        try {
          uint64_t size;
          ceph::real_time mtime;
-         ::decode(size, p);
-         ::decode(mtime, p);
+         decode(size, p);
+         decode(mtime, p);
          if (psize)
            *psize = size;
          if (pmtime)
@@ -285,7 +287,7 @@ struct ObjectOperation {
   // object cmpext
   struct C_ObjectOperation_cmpext : public Context {
     int *prval;
-    C_ObjectOperation_cmpext(int *prval)
+    explicit C_ObjectOperation_cmpext(int *prval)
       : prval(prval) {}
 
     void finish(int r) {
@@ -333,15 +335,15 @@ struct ObjectOperation {
                                  int *prval)
       : data_bl(data_bl), extents(extents), prval(prval) {}
     void finish(int r) override {
-      bufferlist::iterator iter = bl.begin();
+      auto iter = bl.cbegin();
       if (r >= 0) {
         // NOTE: it's possible the sub-op has not been executed but the result
         // code remains zeroed. Avoid the costly exception handling on a
         // potential IO path.
         if (bl.length() > 0) {
          try {
-           ::decode(*extents, iter);
-           ::decode(*data_bl, iter);
+           decode(*extents, iter);
+           decode(*data_bl, iter);
          } catch (buffer::error& e) {
            if (prval)
               *prval = -EIO;
@@ -443,18 +445,18 @@ struct ObjectOperation {
     }
     void finish(int r) override {
       if (r >= 0) {
-       bufferlist::iterator p = bl.begin();
+       auto p = bl.cbegin();
        try {
          if (pattrs)
-           ::decode(*pattrs, p);
+           decode(*pattrs, p);
          if (ptruncated) {
            std::map<std::string,bufferlist> ignore;
            if (!pattrs) {
-             ::decode(ignore, p);
+             decode(ignore, p);
              pattrs = &ignore;
            }
            if (!p.end()) {
-             ::decode(*ptruncated, p);
+             decode(*ptruncated, p);
            } else {
              // the OSD did not provide this.  since old OSDs do not
              // enfoce omap result limits either, we can infer it from
@@ -485,21 +487,21 @@ struct ObjectOperation {
     }
     void finish(int r) override {
       if (r >= 0) {
-       bufferlist::iterator p = bl.begin();
+       auto p = bl.cbegin();
        try {
          if (pattrs)
-           ::decode(*pattrs, p);
+           decode(*pattrs, p);
          if (ptruncated) {
            std::set<std::string> ignore;
            if (!pattrs) {
-             ::decode(ignore, p);
+             decode(ignore, p);
              pattrs = &ignore;
            }
            if (!p.end()) {
-             ::decode(*ptruncated, p);
+             decode(*ptruncated, p);
            } else {
              // the OSD did not provide this.  since old OSDs do not
-             // enfoce omap result limits either, we can infer it from
+             // enforce omap result limits either, we can infer it from
              // the size of the result
              *ptruncated = (pattrs->size() == max_entries);
            }
@@ -520,17 +522,16 @@ struct ObjectOperation {
       : pwatchers(pw), prval(pr) {}
     void finish(int r) override {
       if (r >= 0) {
-       bufferlist::iterator p = bl.begin();
+       auto p = bl.cbegin();
        try {
          obj_list_watch_response_t resp;
-         ::decode(resp, p);
+         decode(resp, p);
          if (pwatchers) {
            for (list<watch_item_t>::iterator i = resp.entries.begin() ;
                 i != resp.entries.end() ; ++i) {
              obj_watch_t ow;
-             ostringstream sa;
-             sa << i->addr;
-             strncpy(ow.addr, sa.str().c_str(), 256);
+             string sa = i->addr.get_legacy_str();
+             strncpy(ow.addr, sa.c_str(), 256);
              ow.watcher_id = i->name.num();
              ow.cookie = i->cookie;
              ow.timeout_seconds = i->timeout_seconds;
@@ -553,10 +554,10 @@ struct ObjectOperation {
       : psnaps(ps), prval(pr) {}
     void finish(int r) override {
       if (r >= 0) {
-       bufferlist::iterator p = bl.begin();
+       auto p = bl.cbegin();
        try {
          obj_list_snap_response_t resp;
-         ::decode(resp, p);
+         decode(resp, p);
          if (psnaps) {
            psnaps->clones.clear();
            for (vector<clone_info>::iterator ci = resp.clones.begin();
@@ -611,12 +612,12 @@ struct ObjectOperation {
   }
   void setxattrs(map<string, bufferlist>& attrs) {
     bufferlist bl;
-    ::encode(attrs, bl);
+    encode(attrs, bl);
     add_xattr(CEPH_OSD_OP_RESETXATTRS, 0, bl.length());
   }
   void resetxattrs(const char *prefix, map<string, bufferlist>& attrs) {
     bufferlist bl;
-    ::encode(attrs, bl);
+    encode(attrs, bl);
     add_xattr(CEPH_OSD_OP_RESETXATTRS, prefix, bl);
   }
 
@@ -624,23 +625,6 @@ struct ObjectOperation {
   void tmap_update(bufferlist& bl) {
     add_data(CEPH_OSD_OP_TMAPUP, 0, 0, bl);
   }
-  void tmap_put(bufferlist& bl) {
-    add_data(CEPH_OSD_OP_TMAPPUT, 0, bl.length(), bl);
-  }
-  void tmap_get(bufferlist *pbl, int *prval) {
-    add_op(CEPH_OSD_OP_TMAPGET);
-    unsigned p = ops.size() - 1;
-    out_bl[p] = pbl;
-    out_rval[p] = prval;
-  }
-  void tmap_get() {
-    add_op(CEPH_OSD_OP_TMAPGET);
-  }
-  void tmap_to_omap(bool nullok=false) {
-     OSDOp& osd_op = add_op(CEPH_OSD_OP_TMAP2OMAP);
-     if (nullok)
-       osd_op.op.tmap2omap.flags = CEPH_OSD_TMAP2OMAP_NULLOK;
-  }
 
   // objectmap
   void omap_get_keys(const string &start_after,
@@ -650,8 +634,8 @@ struct ObjectOperation {
                     int *prval) {
     OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETKEYS);
     bufferlist bl;
-    ::encode(start_after, bl);
-    ::encode(max_to_get, bl);
+    encode(start_after, bl);
+    encode(max_to_get, bl);
     op.op.extent.offset = 0;
     op.op.extent.length = bl.length();
     op.indata.claim_append(bl);
@@ -673,9 +657,9 @@ struct ObjectOperation {
                     int *prval) {
     OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALS);
     bufferlist bl;
-    ::encode(start_after, bl);
-    ::encode(max_to_get, bl);
-    ::encode(filter_prefix, bl);
+    encode(start_after, bl);
+    encode(max_to_get, bl);
+    encode(filter_prefix, bl);
     op.op.extent.offset = 0;
     op.op.extent.length = bl.length();
     op.indata.claim_append(bl);
@@ -694,7 +678,7 @@ struct ObjectOperation {
                            int *prval) {
     OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALSBYKEYS);
     bufferlist bl;
-    ::encode(to_get, bl);
+    encode(to_get, bl);
     op.op.extent.offset = 0;
     op.op.extent.length = bl.length();
     op.indata.claim_append(bl);
@@ -712,7 +696,7 @@ struct ObjectOperation {
                int *prval) {
     OSDOp &op = add_op(CEPH_OSD_OP_OMAP_CMP);
     bufferlist bl;
-    ::encode(assertions, bl);
+    encode(assertions, bl);
     op.op.extent.offset = 0;
     op.op.extent.length = bl.length();
     op.indata.claim_append(bl);
@@ -735,6 +719,7 @@ struct ObjectOperation {
     uint32_t *out_data_digest;
     uint32_t *out_omap_digest;
     mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > *out_reqids;
+    mempool::osd_pglog::map<uint32_t, int> *out_reqid_return_codes;
     uint64_t *out_truncate_seq;
     uint64_t *out_truncate_size;
     int *prval;
@@ -750,6 +735,7 @@ struct ObjectOperation {
                              uint32_t *dd,
                              uint32_t *od,
                              mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > *oreqids,
+                             mempool::osd_pglog::map<uint32_t, int> *oreqid_return_codes,
                              uint64_t *otseq,
                              uint64_t *otsize,
                              int *r)
@@ -759,6 +745,7 @@ struct ObjectOperation {
        out_omap_data(o), out_snaps(osnaps), out_snap_seq(osnap_seq),
        out_flags(flags), out_data_digest(dd), out_omap_digest(od),
        out_reqids(oreqids),
+       out_reqid_return_codes(oreqid_return_codes),
        out_truncate_seq(otseq),
        out_truncate_size(otsize),
        prval(r) {}
@@ -767,9 +754,9 @@ struct ObjectOperation {
       if (r < 0 && r != -ENOENT)
        return;
       try {
-       bufferlist::iterator p = bl.begin();
+       auto p = bl.cbegin();
        object_copy_data_t copy_reply;
-       ::decode(copy_reply, p);
+       decode(copy_reply, p);
        if (r == -ENOENT) {
          if (out_reqids)
            *out_reqids = copy_reply.reqids;
@@ -799,6 +786,8 @@ struct ObjectOperation {
          *out_omap_digest = copy_reply.omap_digest;
        if (out_reqids)
          *out_reqids = copy_reply.reqids;
+       if (out_reqid_return_codes)
+         *out_reqid_return_codes = copy_reply.reqid_return_codes;
        if (out_truncate_seq)
          *out_truncate_seq = copy_reply.truncate_seq;
        if (out_truncate_size)
@@ -825,13 +814,14 @@ struct ObjectOperation {
                uint32_t *out_data_digest,
                uint32_t *out_omap_digest,
                mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > *out_reqids,
+               mempool::osd_pglog::map<uint32_t, int> *out_reqid_return_codes,
                uint64_t *truncate_seq,
                uint64_t *truncate_size,
                int *prval) {
     OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_GET);
     osd_op.op.copy_get.max = max;
-    ::encode(*cursor, osd_op.indata);
-    ::encode(max, osd_op.indata);
+    encode(*cursor, osd_op.indata);
+    encode(max, osd_op.indata);
     unsigned p = ops.size() - 1;
     out_rval[p] = prval;
     C_ObjectOperation_copyget *h =
@@ -839,7 +829,8 @@ struct ObjectOperation {
                                    out_attrs, out_data, out_omap_header,
                                    out_omap_data, out_snaps, out_snap_seq,
                                    out_flags, out_data_digest,
-                                   out_omap_digest, out_reqids, truncate_seq,
+                                   out_omap_digest, out_reqids,
+                                   out_reqid_return_codes, truncate_seq,
                                    truncate_size, prval);
     out_bl[p] = &h->bl;
     out_handler[p] = h;
@@ -859,9 +850,9 @@ struct ObjectOperation {
       if (r < 0)
        return;
       try {
-       bufferlist::iterator p = bl.begin();
+       auto p = bl.cbegin();
        bool isdirty;
-       ::decode(isdirty, p);
+       decode(isdirty, p);
        if (pisdirty)
          *pisdirty = isdirty;
       } catch (buffer::error& e) {
@@ -895,9 +886,9 @@ struct ObjectOperation {
       if (r < 0)
        return;
       try {
-       bufferlist::iterator p = bl.begin();
+       auto p = bl.cbegin();
        std::list< std::pair<ceph::real_time, ceph::real_time> > ls;
-       ::decode(ls, p);
+       decode(ls, p);
        if (ptls) {
          ptls->clear();
          for (auto p = ls.begin(); p != ls.end(); ++p)
@@ -977,7 +968,7 @@ struct ObjectOperation {
 
   void omap_set(const map<string, bufferlist> &map) {
     bufferlist bl;
-    ::encode(map, bl);
+    encode(map, bl);
     add_data(CEPH_OSD_OP_OMAPSETVALS, 0, bl.length(), bl);
   }
 
@@ -991,7 +982,7 @@ struct ObjectOperation {
 
   void omap_rm_keys(const std::set<std::string> &to_remove) {
     bufferlist bl;
-    ::encode(to_remove, bl);
+    encode(to_remove, bl);
     add_data(CEPH_OSD_OP_OMAPRMKEYS, 0, bl.length(), bl);
   }
 
@@ -1017,9 +1008,9 @@ struct ObjectOperation {
               bufferlist &bl, bufferlist *inbl) {
     OSDOp& osd_op = add_op(CEPH_OSD_OP_NOTIFY);
     osd_op.op.notify.cookie = cookie;
-    ::encode(prot_ver, *inbl);
-    ::encode(timeout, *inbl);
-    ::encode(bl, *inbl);
+    encode(prot_ver, *inbl);
+    encode(timeout, *inbl);
+    encode(bl, *inbl);
     osd_op.indata.append(*inbl);
   }
 
@@ -1027,9 +1018,9 @@ struct ObjectOperation {
                  bufferlist& reply_bl) {
     OSDOp& osd_op = add_op(CEPH_OSD_OP_NOTIFY_ACK);
     bufferlist bl;
-    ::encode(notify_id, bl);
-    ::encode(cookie, bl);
-    ::encode(reply_bl, bl);
+    encode(notify_id, bl);
+    encode(cookie, bl);
+    encode(reply_bl, bl);
     osd_op.indata.append(bl);
   }
 
@@ -1084,8 +1075,8 @@ struct ObjectOperation {
     osd_op.op.copy_from.src_version = src_version;
     osd_op.op.copy_from.flags = flags;
     osd_op.op.copy_from.src_fadvise_flags = src_fadvise_flags;
-    ::encode(src, osd_op.indata);
-    ::encode(src_oloc, osd_op.indata);
+    encode(src, osd_op.indata);
+    encode(src_oloc, osd_op.indata);
   }
 
   /**
@@ -1133,12 +1124,32 @@ struct ObjectOperation {
    * Extensible tier
    */
   void set_redirect(object_t tgt, snapid_t snapid, object_locator_t tgt_oloc, 
-                   version_t tgt_version) {
+                   version_t tgt_version, int flag) {
     OSDOp& osd_op = add_op(CEPH_OSD_OP_SET_REDIRECT);
     osd_op.op.copy_from.snapid = snapid;
     osd_op.op.copy_from.src_version = tgt_version;
-    ::encode(tgt, osd_op.indata);
-    ::encode(tgt_oloc, osd_op.indata);
+    encode(tgt, osd_op.indata);
+    encode(tgt_oloc, osd_op.indata);
+    set_last_op_flags(flag);
+  }
+
+  void set_chunk(uint64_t src_offset, uint64_t src_length, object_locator_t tgt_oloc,
+                object_t tgt_oid, uint64_t tgt_offset, int flag) {
+    OSDOp& osd_op = add_op(CEPH_OSD_OP_SET_CHUNK);
+    encode(src_offset, osd_op.indata);
+    encode(src_length, osd_op.indata);
+    encode(tgt_oloc, osd_op.indata);
+    encode(tgt_oid, osd_op.indata);
+    encode(tgt_offset, osd_op.indata);
+    set_last_op_flags(flag);
+  }
+
+  void tier_promote() {
+    add_op(CEPH_OSD_OP_TIER_PROMOTE);
+  }
+
+  void unset_manifest() {
+    add_op(CEPH_OSD_OP_UNSET_MANIFEST);
   }
 
   void set_alloc_hint(uint64_t expected_object_size,
@@ -1185,7 +1196,7 @@ class Objecter : public md_config_obs_t, public Dispatcher {
 public:
   // config observer bits
   const char** get_tracked_conf_keys() const override;
-  void handle_conf_change(const struct md_config_t *conf,
+  void handle_conf_change(const ConfigProxy& conf,
                           const std::set <std::string> &changed) override;
 
 public:
@@ -1228,12 +1239,12 @@ private:
   version_t last_seen_osdmap_version;
   version_t last_seen_pgmap_version;
 
-  mutable boost::shared_mutex rwlock;
-  using lock_guard = std::unique_lock<decltype(rwlock)>;
+  mutable std::shared_mutex rwlock;
+  using lock_guard = std::lock_guard<decltype(rwlock)>;
   using unique_lock = std::unique_lock<decltype(rwlock)>;
   using shared_lock = boost::shared_lock<decltype(rwlock)>;
   using shunique_lock = ceph::shunique_lock<decltype(rwlock)>;
-  ceph::timer<ceph::mono_clock> timer;
+  ceph::timer<ceph::coarse_mono_clock> timer;
 
   PerfCounters *logger;
 
@@ -1250,7 +1261,6 @@ private:
 public:
   /*** track pending operations ***/
   // read
- public:
 
   struct OSDSession;
 
@@ -1277,6 +1287,7 @@ public:
     spg_t actual_pgid; ///< last (actual) spg_t we mapped to
     unsigned pg_num = 0; ///< last pg_num we mapped to
     unsigned pg_num_mask = 0; ///< last pg_num_mask we mapped to
+    unsigned pg_num_pending = 0; ///< last pg_num we mapped to
     vector<int> up; ///< set of up osds for last pg we mapped to
     vector<int> acting; ///< set of acting osds for last pg we mapped to
     int up_primary = -1; ///< last up_primary we mapped to
@@ -1299,7 +1310,7 @@ public:
        base_oloc(oloc)
       {}
 
-    op_target_t(pg_t pgid)
+    explicit op_target_t(pg_t pgid)
       : base_oloc(pgid.pool(), pgid.ps()),
        precalc_pgid(true),
        base_pgid(pgid)
@@ -1355,11 +1366,11 @@ public:
     version_t *objver;
     epoch_t *reply_epoch;
 
-    ceph::mono_time stamp;
+    ceph::coarse_mono_time stamp;
 
     epoch_t map_dne_bound;
 
-    bool budgeted;
+    int budget;
 
     /// true if we should resend this message on failure
     bool should_resend;
@@ -1392,7 +1403,7 @@ public:
       objver(ov),
       reply_epoch(NULL),
       map_dne_bound(0),
-      budgeted(false),
+      budget(-1),
       should_resend(true),
       ctx_budgeted(false),
       data_offset(offset) {
@@ -1464,11 +1475,11 @@ public:
       psize(ps), pmtime(pm), fin(c) {}
     void finish(int r) override {
       if (r >= 0) {
-       bufferlist::iterator p = bl.begin();
+       auto p = bl.cbegin();
        uint64_t s;
        ceph::real_time m;
-       ::decode(s, p);
-       ::decode(m, p);
+       decode(s, p);
+       decode(m, p);
        if (psize)
          *psize = s;
        if (pmtime)
@@ -1486,8 +1497,8 @@ public:
                                                           fin(c) {}
     void finish(int r) override {
       if (r >= 0) {
-       bufferlist::iterator p = bl.begin();
-       ::decode(attrset, p);
+       auto p = bl.cbegin();
+       decode(attrset, p);
       }
       fin->complete(r);
     }
@@ -1556,7 +1567,7 @@ public:
     Context *onfinish;
     uint64_t ontimeout;
 
-    ceph::mono_time last_submit;
+    ceph::coarse_mono_time last_submit;
   };
 
   struct StatfsOp {
@@ -1566,7 +1577,7 @@ public:
     Context *onfinish;
     uint64_t ontimeout;
 
-    ceph::mono_time last_submit;
+    ceph::coarse_mono_time last_submit;
   };
 
   struct PoolOp {
@@ -1576,14 +1587,13 @@ public:
     Context *onfinish;
     uint64_t ontimeout;
     int pool_op;
-    uint64_t auid;
     int16_t crush_rule;
     snapid_t snapid;
     bufferlist *blp;
 
-    ceph::mono_time last_submit;
+    ceph::coarse_mono_time last_submit;
     PoolOp() : tid(0), pool(0), onfinish(NULL), ontimeout(0), pool_op(0),
-              auid(0), crush_rule(0), snapid(0), blp(NULL) {}
+              crush_rule(0), snapid(0), blp(NULL) {}
   };
 
   // -- osd commands --
@@ -1607,7 +1617,7 @@ public:
 
     Context *onfinish = nullptr;
     uint64_t ontimeout = 0;
-    ceph::mono_time last_submit;
+    ceph::coarse_mono_time last_submit;
 
     CommandOp(
       int target_osd,
@@ -1676,9 +1686,9 @@ public:
     version_t *pobjver;
 
     bool is_watch;
-    ceph::mono_time watch_valid_thru; ///< send time for last acked ping
+    ceph::coarse_mono_time watch_valid_thru; ///< send time for last acked ping
     int last_error;  ///< error from last failed ping|reconnect, if any
-    boost::shared_mutex watch_lock;
+    std::shared_mutex watch_lock;
     using lock_guard = std::unique_lock<decltype(watch_lock)>;
     using unique_lock = std::unique_lock<decltype(watch_lock)>;
     using shared_lock = boost::shared_lock<decltype(watch_lock)>;
@@ -1686,7 +1696,7 @@ public:
 
     // queue of pending async operations, with the timestamp of
     // when they were queued.
-    list<ceph::mono_time> watch_pending_async;
+    list<ceph::coarse_mono_time> watch_pending_async;
 
     uint32_t register_gen;
     bool registered;
@@ -1702,40 +1712,43 @@ public:
 
     OSDSession *session;
 
+    Objecter *objecter;
+    int ctx_budget;
     ceph_tid_t register_tid;
     ceph_tid_t ping_tid;
     epoch_t map_dne_bound;
 
     void _queued_async() {
       // watch_lock ust be locked unique
-      watch_pending_async.push_back(ceph::mono_clock::now());
+      watch_pending_async.push_back(ceph::coarse_mono_clock::now());
     }
     void finished_async() {
       unique_lock l(watch_lock);
-      assert(!watch_pending_async.empty());
+      ceph_assert(!watch_pending_async.empty());
       watch_pending_async.pop_front();
     }
 
-    LingerOp() : linger_id(0),
-                target(object_t(), object_locator_t(), 0),
-                snap(CEPH_NOSNAP), poutbl(NULL), pobjver(NULL),
-                is_watch(false), last_error(0),
-                register_gen(0),
-                registered(false),
-                canceled(false),
-                on_reg_commit(NULL),
-                on_notify_finish(NULL),
-                notify_result_bl(NULL),
-                notify_id(0),
-                watch_context(NULL),
-                session(NULL),
-                register_tid(0),
-                ping_tid(0),
-                map_dne_bound(0) {}
-
-    // no copy!
-    const LingerOp &operator=(const LingerOp& r);
-    LingerOp(const LingerOp& o);
+    explicit LingerOp(Objecter *o) : linger_id(0),
+                           target(object_t(), object_locator_t(), 0),
+                           snap(CEPH_NOSNAP), poutbl(NULL), pobjver(NULL),
+                           is_watch(false), last_error(0),
+                           register_gen(0),
+                           registered(false),
+                           canceled(false),
+                           on_reg_commit(NULL),
+                           on_notify_finish(NULL),
+                           notify_result_bl(NULL),
+                           notify_id(0),
+                           watch_context(NULL),
+                           session(NULL),
+                           objecter(o),
+                           ctx_budget(-1),
+                           register_tid(0),
+                           ping_tid(0),
+                           map_dne_bound(0) {}
+
+    const LingerOp &operator=(const LingerOp& r) = delete;
+    LingerOp(const LingerOp& o) = delete;
 
     uint64_t get_cookie() {
       return reinterpret_cast<uint64_t>(this);
@@ -1779,7 +1792,7 @@ public:
   struct C_Linger_Ping : public Context {
     Objecter *objecter;
     LingerOp *info;
-    ceph::mono_time sent;
+    ceph::coarse_mono_time sent;
     uint32_t register_gen;
     C_Linger_Ping(Objecter *o, LingerOp *l)
       : objecter(o), info(l), register_gen(info->register_gen) {
@@ -1810,7 +1823,7 @@ public:
   };
 
   struct OSDSession : public RefCountedObject {
-    boost::shared_mutex lock;
+    std::shared_mutex lock;
     using lock_guard = std::lock_guard<decltype(lock)>;
     using unique_lock = std::unique_lock<decltype(lock)>;
     using shared_lock = boost::shared_lock<decltype(lock)>;
@@ -1885,10 +1898,9 @@ public:
   ceph::timespan osd_timeout;
 
   MOSDOp *_prepare_osd_op(Op *op);
-  void _send_op(Op *op, MOSDOp *m = NULL);
+  void _send_op(Op *op);
   void _send_op_account(Op *op);
   void _cancel_linger_op(Op *op);
-  void finish_op(OSDSession *session, ceph_tid_t tid);
   void _finish_op(Op *op, int r);
   static bool is_pg_changed(
     int oldprimary,
@@ -1905,6 +1917,9 @@ public:
   };
   bool _osdmap_full_flag() const;
   bool _osdmap_has_pool_full() const;
+  void _prune_snapc(
+    const mempool::osdmap::map<int64_t, OSDMap::snap_interval_set_t>& new_removed_snaps,
+    Op *op);
 
   bool target_should_be_paused(op_target_t *op);
   int _calc_target(op_target_t *t, Connection *con,
@@ -1929,7 +1944,7 @@ public:
   void _linger_commit(LingerOp *info, int r, bufferlist& outbl);
   void _linger_reconnect(LingerOp *info, int r);
   void _send_linger_ping(LingerOp *info);
-  void _linger_ping(LingerOp *info, int r, ceph::mono_time sent,
+  void _linger_ping(LingerOp *info, int r, ceph::coarse_mono_time sent,
                    uint32_t register_gen);
   int _normalize_watch_error(int r);
 
@@ -1950,7 +1965,6 @@ private:
   void _send_command_map_check(CommandOp *op);
   void _command_cancel_map_check(CommandOp *op);
 
-  void kick_requests(OSDSession *session);
   void _kick_requests(OSDSession *session, map<uint64_t, LingerOp *>& lresend);
   void _linger_ops_resend(map<uint64_t, LingerOp *>& lresend, unique_lock& ul);
 
@@ -1971,30 +1985,27 @@ private:
    * and returned whenever an op is removed from the map
    * If throttle_op needs to throttle it will unlock client_lock.
    */
-  int calc_op_budget(Op *op);
+  int calc_op_budget(const vector<OSDOp>& ops);
   void _throttle_op(Op *op, shunique_lock& sul, int op_size = 0);
   int _take_op_budget(Op *op, shunique_lock& sul) {
-    assert(sul && sul.mutex() == &rwlock);
-    int op_budget = calc_op_budget(op);
+    ceph_assert(sul && sul.mutex() == &rwlock);
+    int op_budget = calc_op_budget(op->ops);
     if (keep_balanced_budget) {
       _throttle_op(op, sul, op_budget);
-    } else {
+    } else { // update take_linger_budget to match this!
       op_throttle_bytes.take(op_budget);
       op_throttle_ops.take(1);
     }
-    op->budgeted = true;
+    op->budget = op_budget;
     return op_budget;
   }
+  int take_linger_budget(LingerOp *info);
+  friend class WatchContext; // to invoke put_up_budget_bytes
   void put_op_budget_bytes(int op_budget) {
-    assert(op_budget >= 0);
+    ceph_assert(op_budget >= 0);
     op_throttle_bytes.put(op_budget);
     op_throttle_ops.put(1);
   }
-  void put_op_budget(Op *op) {
-    assert(op->budgeted);
-    int op_budget = calc_op_budget(op);
-    put_op_budget_bytes(op_budget);
-  }
   void put_nlist_context_budget(NListContext *list_context);
   Throttle op_throttle_bytes, op_throttle_ops;
 
@@ -2062,14 +2073,16 @@ private:
   void set_osdmap_full_try() { osdmap_full_try = true; }
   void unset_osdmap_full_try() { osdmap_full_try = false; }
 
-  void _scan_requests(OSDSession *s,
-                     bool force_resend,
-                     bool cluster_full,
-                     map<int64_t, bool> *pool_full_map,
-                     map<ceph_tid_t, Op*>& need_resend,
-                     list<LingerOp*>& need_resend_linger,
-                     map<ceph_tid_t, CommandOp*>& need_resend_command,
-                     shunique_lock& sul);
+  void _scan_requests(
+    OSDSession *s,
+    bool skipped_map,
+    bool cluster_full,
+    map<int64_t, bool> *pool_full_map,
+    map<ceph_tid_t, Op*>& need_resend,
+    list<LingerOp*>& need_resend_linger,
+    map<ceph_tid_t, CommandOp*>& need_resend_command,
+    shunique_lock& sul,
+    const mempool::osdmap::map<int64_t,OSDMap::snap_interval_set_t> *gap_removed_snaps);
 
   int64_t get_object_hash_position(int64_t pool, const string& key,
                                   const string& ns);
@@ -2131,8 +2144,6 @@ private:
   void _op_submit_with_budget(Op *op, shunique_lock& lc,
                              ceph_tid_t *ptid,
                              int *ctx_budget = NULL);
-  inline void unregister_op(Op *op);
-
   // public interface
 public:
   void op_submit(Op *op, ceph_tid_t *ptid = NULL, int *ctx_budget = NULL);
@@ -2168,7 +2179,6 @@ public:
   void _wait_for_new_map(Context *c, epoch_t epoch, int err=0);
   void wait_for_latest_osdmap(Context *fin);
   void get_latest_version(epoch_t oldest, epoch_t neweset, Context *fin);
-  void _get_latest_version(epoch_t oldest, epoch_t neweset, Context *fin);
 
   /** Get the current set of global op flags */
   int get_global_op_flags() const { return global_op_flags; }
@@ -2203,7 +2213,7 @@ public:
   void osd_command(int osd, const std::vector<string>& cmd,
                  const bufferlist& inbl, ceph_tid_t *ptid,
                  bufferlist *poutbl, string *prs, Context *onfinish) {
-    assert(osd >= 0);
+    ceph_assert(osd >= 0);
     CommandOp *c = new CommandOp(
       osd,
       cmd,
@@ -2518,7 +2528,7 @@ public:
     ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
     ops[i].op.xattr.value_len = 0;
     if (name)
-      ops[i].indata.append(name);
+      ops[i].indata.append(name, ops[i].op.xattr.name_len);
     Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
                   CEPH_OSD_FLAG_READ, onfinish, objver);
     o->snapid = snap;
@@ -2836,7 +2846,7 @@ public:
     ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
     ops[i].op.xattr.value_len = bl.length();
     if (name)
-      ops[i].indata.append(name);
+      ops[i].indata.append(name, ops[i].op.xattr.name_len);
     ops[i].indata.append(bl);
     Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
                   CEPH_OSD_FLAG_WRITE, oncommit, objver);
@@ -2857,7 +2867,7 @@ public:
     ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
     ops[i].op.xattr.value_len = 0;
     if (name)
-      ops[i].indata.append(name);
+      ops[i].indata.append(name, ops[i].op.xattr.name_len);
     Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
                   CEPH_OSD_FLAG_WRITE, oncommit, objver);
     o->mtime = mtime;
@@ -2912,11 +2922,10 @@ public:
   int delete_pool_snap(int64_t pool, string& snapName, Context *onfinish);
   int delete_selfmanaged_snap(int64_t pool, snapid_t snap, Context *onfinish);
 
-  int create_pool(string& name, Context *onfinish, uint64_t auid=0,
+  int create_pool(string& name, Context *onfinish,
                  int crush_rule=-1);
   int delete_pool(int64_t pool, Context *onfinish);
   int delete_pool(const string& name, Context *onfinish);
-  int change_pool_auid(int64_t pool, Context *onfinish, uint64_t auid);
 
   void handle_pool_op_reply(MPoolOpReply *m);
   int pool_op_cancel(ceph_tid_t tid, int r);
@@ -3016,7 +3025,7 @@ public:
             bit != p->buffer_extents.end();
             ++bit)
          bl.copy(bit->first, bit->second, cur);
-       assert(cur.length() == p->length);
+       ceph_assert(cur.length() == p->length);
        write_trunc(p->oid, p->oloc, p->offset, p->length,
              snapc, cur, mtime, flags, p->truncate_size, trunc_seq,
              oncommit ? gcom.new_sub():0,
@@ -3038,8 +3047,7 @@ public:
   void ms_handle_remote_reset(Connection *con) override;
   bool ms_handle_refused(Connection *con) override;
   bool ms_get_authorizer(int dest_type,
-                        AuthAuthorizer **authorizer,
-                        bool force_new) override;
+                        AuthAuthorizer **authorizer) override;
 
   void blacklist_self(bool set);