]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osdc/Objecter.cc
update sources to 12.2.8
[ceph.git] / ceph / src / osdc / Objecter.cc
index 050c762c7cd3b7236f31636db6cb98cf517a82cd..c7851e91c8e53e01dca04c628955def4ee84d96e 100644 (file)
@@ -237,7 +237,7 @@ void Objecter::update_crush_location()
  */
 void Objecter::init()
 {
-  assert(!initialized.read());
+  assert(!initialized);
 
   if (!logger) {
     PerfCountersBuilder pcb(cct, "objecter", l_osdc_first, l_osdc_last);
@@ -246,7 +246,7 @@ void Objecter::init()
                PerfCountersBuilder::PRIO_CRITICAL);
     pcb.add_u64(l_osdc_op_laggy, "op_laggy", "Laggy operations");
     pcb.add_u64_counter(l_osdc_op_send, "op_send", "Sent operations");
-    pcb.add_u64_counter(l_osdc_op_send_bytes, "op_send_bytes", "Sent data");
+    pcb.add_u64_counter(l_osdc_op_send_bytes, "op_send_bytes", "Sent data", NULL, 0, unit_t(BYTES));
     pcb.add_u64_counter(l_osdc_op_resend, "op_resend", "Resent operations");
     pcb.add_u64_counter(l_osdc_op_reply, "op_reply", "Operation reply");
 
@@ -387,7 +387,7 @@ void Objecter::init()
 
   cct->_conf->add_observer(this);
 
-  initialized.set(1);
+  initialized = true;
 }
 
 /*
@@ -407,11 +407,11 @@ void Objecter::start(const OSDMap* o)
 
 void Objecter::shutdown()
 {
-  assert(initialized.read());
+  assert(initialized);
 
   unique_lock wl(rwlock);
 
-  initialized.set(0);
+  initialized = false;
 
   cct->_conf->remove_observer(this);
 
@@ -562,7 +562,7 @@ void Objecter::_send_linger(LingerOp *info,
   o->mtime = info->mtime;
 
   o->target = info->target;
-  o->tid = last_tid.inc();
+  o->tid = ++last_tid;
 
   // do not resend this; we will send a new op to reregister
   o->should_resend = false;
@@ -594,6 +594,10 @@ void Objecter::_linger_commit(LingerOp *info, int r, bufferlist& outbl)
     info->on_reg_commit->complete(r);
     info->on_reg_commit = NULL;
   }
+  if (r < 0 && info->on_notify_finish) {
+    info->on_notify_finish->complete(r);
+    info->on_notify_finish = nullptr;
+  }
 
   // only tell the user the first time we do this
   info->registered = true;
@@ -694,7 +698,7 @@ void Objecter::_send_linger_ping(LingerOp *info)
   o->should_resend = false;
   _send_op_account(o);
   MOSDOp *m = _prepare_osd_op(o);
-  o->tid = last_tid.inc();
+  o->tid = ++last_tid;
   _session_op_assign(info->session, o);
   _send_op(o, m);
   info->ping_tid = o->tid;
@@ -888,7 +892,7 @@ struct C_DoWatchNotify : public Context {
 void Objecter::handle_watch_notify(MWatchNotify *m)
 {
   shared_lock l(rwlock);
-  if (!initialized.read()) {
+  if (!initialized) {
     return;
   }
 
@@ -931,7 +935,7 @@ void Objecter::_do_watch_notify(LingerOp *info, MWatchNotify *m)
   ldout(cct, 10) << __func__ << " " << *m << dendl;
 
   shared_lock l(rwlock);
-  assert(initialized.read());
+  assert(initialized);
 
   if (info->canceled) {
     l.unlock();
@@ -961,7 +965,7 @@ void Objecter::_do_watch_notify(LingerOp *info, MWatchNotify *m)
 bool Objecter::ms_dispatch(Message *m)
 {
   ldout(cct, 10) << __func__ << " " << cct << " " << *m << dendl;
-  if (!initialized.read())
+  if (!initialized)
     return false;
 
   switch (m->get_type()) {
@@ -1134,7 +1138,7 @@ void Objecter::_scan_requests(OSDSession *s,
 void Objecter::handle_osd_map(MOSDMap *m)
 {
   shunique_lock sul(rwlock, acquire_unique);
-  if (!initialized.read())
+  if (!initialized)
     return;
 
   assert(osdmap);
@@ -1182,11 +1186,20 @@ void Objecter::handle_osd_map(MOSDMap *m)
                        << dendl;
          OSDMap::Incremental inc(m->incremental_maps[e]);
          osdmap->apply_incremental(inc);
+
+          emit_blacklist_events(inc);
+
          logger->inc(l_osdc_map_inc);
        }
        else if (m->maps.count(e)) {
          ldout(cct, 3) << "handle_osd_map decoding full epoch " << e << dendl;
-         osdmap->decode(m->maps[e]);
+          OSDMap *new_osdmap = new OSDMap();
+          new_osdmap->decode(m->maps[e]);
+
+          emit_blacklist_events(*osdmap, *new_osdmap);
+
+          osdmap = new_osdmap;
+
          logger->inc(l_osdc_map_full);
        }
        else {
@@ -1317,7 +1330,7 @@ void Objecter::handle_osd_map(MOSDMap *m)
     if (!op->session) {
       _calc_target(&op->target, nullptr);
       OSDSession *s = NULL;
-      int const r = _get_session(op->target.osd, &s, sul);
+      const int r = _get_session(op->target.osd, &s, sul);
       assert(r == 0);
       assert(s != NULL);
       op->session = s;
@@ -1361,6 +1374,58 @@ void Objecter::handle_osd_map(MOSDMap *m)
   }
 }
 
+void Objecter::enable_blacklist_events()
+{
+  unique_lock wl(rwlock);
+
+  blacklist_events_enabled = true;
+}
+
+void Objecter::consume_blacklist_events(std::set<entity_addr_t> *events)
+{
+  unique_lock wl(rwlock);
+
+  if (events->empty()) {
+    events->swap(blacklist_events);
+  } else {
+    for (const auto &i : blacklist_events) {
+      events->insert(i);
+    }
+    blacklist_events.clear();
+  }
+}
+
+void Objecter::emit_blacklist_events(const OSDMap::Incremental &inc)
+{
+  if (!blacklist_events_enabled) {
+    return;
+  }
+
+  for (const auto &i : inc.new_blacklist) {
+    blacklist_events.insert(i.first);
+  }
+}
+
+void Objecter::emit_blacklist_events(const OSDMap &old_osd_map,
+                                     const OSDMap &new_osd_map)
+{
+  if (!blacklist_events_enabled) {
+    return;
+  }
+
+  std::set<entity_addr_t> old_set;
+  std::set<entity_addr_t> new_set;
+
+  old_osd_map.get_blacklist(&old_set);
+  new_osd_map.get_blacklist(&new_set);
+
+  std::set<entity_addr_t> delta_set;
+  std::set_difference(
+      new_set.begin(), new_set.end(), old_set.begin(), old_set.end(),
+      std::inserter(delta_set, delta_set.begin()));
+  blacklist_events.insert(delta_set.begin(), delta_set.end());
+}
+
 // op pool check
 
 void Objecter::C_Op_Map_Latest::finish(int r)
@@ -1578,8 +1643,14 @@ void Objecter::_check_linger_pool_dne(LingerOp *op, bool *need_unregister)
   }
   if (op->map_dne_bound > 0) {
     if (osdmap->get_epoch() >= op->map_dne_bound) {
+      LingerOp::unique_lock wl{op->watch_lock};
       if (op->on_reg_commit) {
        op->on_reg_commit->complete(-ENOENT);
+       op->on_reg_commit = nullptr;
+      }
+      if (op->on_notify_finish) {
+        op->on_notify_finish->complete(-ENOENT);
+        op->on_notify_finish = nullptr;
       }
       *need_unregister = true;
     }
@@ -1635,7 +1706,9 @@ void Objecter::C_Command_Map_Latest::finish(int r)
   if (c->map_dne_bound == 0)
     c->map_dne_bound = latest;
 
+  OSDSession::unique_lock sul(c->session->lock);
   objecter->_check_command_map_dne(c);
+  sul.unlock();
 
   c->put();
 }
@@ -1643,6 +1716,7 @@ void Objecter::C_Command_Map_Latest::finish(int r)
 void Objecter::_check_command_map_dne(CommandOp *c)
 {
   // rwlock is locked unique
+  // session is locked unique
 
   ldout(cct, 10) << "_check_command_map_dne tid " << c->tid
                 << " current " << osdmap->get_epoch()
@@ -1660,6 +1734,7 @@ void Objecter::_check_command_map_dne(CommandOp *c)
 void Objecter::_send_command_map_check(CommandOp *c)
 {
   // rwlock is locked unique
+  // session is locked unique
 
   // ask the monitor
   if (check_latest_map_commands.count(c->tid) == 0) {
@@ -2056,7 +2131,7 @@ void Objecter::tick()
   // we are only called by C_Tick
   tick_event = 0;
 
-  if (!initialized.read()) {
+  if (!initialized) {
     // we raced with shutdown
     ldout(cct, 10) << __func__ << " raced with shutdown" << dendl;
     return;
@@ -2112,7 +2187,7 @@ void Objecter::tick()
     if (found)
       toping.insert(s);
   }
-  if (num_homeless_ops.read() || !toping.empty()) {
+  if (num_homeless_ops || !toping.empty()) {
     _maybe_request_map();
   }
 
@@ -2130,7 +2205,7 @@ void Objecter::tick()
   }
 
   // Make sure we don't resechedule if we wake up after shutdown
-  if (initialized.read()) {
+  if (initialized) {
     tick_event = timer.reschedule_me(ceph::make_timespan(
                                       cct->_conf->objecter_tick_interval));
   }
@@ -2195,6 +2270,7 @@ void Objecter::op_submit(Op *op, ceph_tid_t *ptid, int *ctx_budget)
   ceph_tid_t tid = 0;
   if (!ptid)
     ptid = &tid;
+  op->trace.event("op submit");
   _op_submit_with_budget(op, rl, ptid, ctx_budget);
 }
 
@@ -2202,7 +2278,7 @@ void Objecter::_op_submit_with_budget(Op *op, shunique_lock& sul,
                                      ceph_tid_t *ptid,
                                      int *ctx_budget)
 {
-  assert(initialized.read());
+  assert(initialized);
 
   assert(op->ops.size() == op->out_bl.size());
   assert(op->ops.size() == op->out_rval.size());
@@ -2221,7 +2297,7 @@ void Objecter::_op_submit_with_budget(Op *op, shunique_lock& sul,
 
   if (osd_timeout > timespan(0)) {
     if (op->tid == 0)
-      op->tid = last_tid.inc();
+      op->tid = ++last_tid;
     auto tid = op->tid;
     op->ontimeout = timer.add_event(osd_timeout,
                                    [this, tid]() {
@@ -2233,11 +2309,11 @@ void Objecter::_op_submit_with_budget(Op *op, shunique_lock& sul,
 
 void Objecter::_send_op_account(Op *op)
 {
-  inflight_ops.inc();
+  inflight_ops++;
 
   // add to gather set(s)
   if (op->onfinish) {
-    num_in_flight.inc();
+    num_in_flight++;
   } else {
     ldout(cct, 20) << " note: not requesting reply" << dendl;
   }
@@ -2396,7 +2472,7 @@ void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid)
 
   OSDSession::unique_lock sl(s->lock);
   if (op->tid == 0)
-    op->tid = last_tid.inc();
+    op->tid = ++last_tid;
 
   ldout(cct, 10) << "_op_submit oid " << op->target.base_oid
                 << " '" << op->target.base_oloc << "' '"
@@ -2423,12 +2499,12 @@ void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid)
   sl.unlock();
   put_session(s);
 
-  ldout(cct, 5) << num_in_flight.read() << " in flight" << dendl;
+  ldout(cct, 5) << num_in_flight << " in flight" << dendl;
 }
 
 int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
 {
-  assert(initialized.read());
+  assert(initialized);
 
   OSDSession::unique_lock sl(s->lock);
 
@@ -2449,7 +2525,7 @@ int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
                 << dendl;
   Op *op = p->second;
   if (op->onfinish) {
-    num_in_flight.dec();
+    num_in_flight--;
     op->onfinish->complete(r);
     op->onfinish = NULL;
   }
@@ -2470,6 +2546,16 @@ int Objecter::op_cancel(ceph_tid_t tid, int r)
   return ret;
 }
 
+int Objecter::op_cancel(const vector<ceph_tid_t>& tids, int r)
+{
+  unique_lock wl(rwlock);
+  ldout(cct,10) << __func__ << " " << tids << dendl;
+  for (auto tid : tids) {
+    _op_cancel(tid, r);
+  }
+  return 0;
+}
+
 int Objecter::_op_cancel(ceph_tid_t tid, int r)
 {
   int ret = 0;
@@ -2759,6 +2845,7 @@ int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
   osdmap->pg_to_up_acting_osds(pgid, &up, &up_primary,
                               &acting, &acting_primary);
   bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
+  bool recovery_deletes = osdmap->test_flag(CEPH_OSDMAP_RECOVERY_DELETES);
   unsigned prev_seed = ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask);
   pg_t prev_pgid(prev_seed, pgid.pool());
   if (any_change && PastIntervals::is_new_interval(
@@ -2778,6 +2865,8 @@ int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
        pg_num,
        t->sort_bitwise,
        sort_bitwise,
+       t->recovery_deletes,
+       recovery_deletes,
        prev_pgid)) {
     force_resend = true;
   }
@@ -2812,6 +2901,7 @@ int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
       pg_t(ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask), pgid.pool()),
       &t->actual_pgid);
     t->sort_bitwise = sort_bitwise;
+    t->recovery_deletes = recovery_deletes;
     ldout(cct, 10) << __func__ << " "
                   << " raw pgid " << pgid << " -> actual " << t->actual_pgid
                   << " acting " << acting
@@ -2886,7 +2976,7 @@ void Objecter::_session_op_assign(OSDSession *to, Op *op)
   to->ops[op->tid] = op;
 
   if (to->is_homeless()) {
-    num_homeless_ops.inc();
+    num_homeless_ops++;
   }
 
   ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
@@ -2898,7 +2988,7 @@ void Objecter::_session_op_remove(OSDSession *from, Op *op)
   // from->lock is locked
 
   if (from->is_homeless()) {
-    num_homeless_ops.dec();
+    num_homeless_ops--;
   }
 
   from->ops.erase(op->tid);
@@ -2914,7 +3004,7 @@ void Objecter::_session_linger_op_assign(OSDSession *to, LingerOp *op)
   assert(op->session == NULL);
 
   if (to->is_homeless()) {
-    num_homeless_ops.inc();
+    num_homeless_ops++;
   }
 
   get_session(to);
@@ -2931,7 +3021,7 @@ void Objecter::_session_linger_op_remove(OSDSession *from, LingerOp *op)
   // from->lock is locked unique
 
   if (from->is_homeless()) {
-    num_homeless_ops.dec();
+    num_homeless_ops--;
   }
 
   from->linger_ops.erase(op->linger_id);
@@ -2948,7 +3038,7 @@ void Objecter::_session_command_op_remove(OSDSession *from, CommandOp *op)
   // from->lock is locked
 
   if (from->is_homeless()) {
-    num_homeless_ops.dec();
+    num_homeless_ops--;
   }
 
   from->command_ops.erase(op->tid);
@@ -2965,7 +3055,7 @@ void Objecter::_session_command_op_assign(OSDSession *to, CommandOp *op)
   assert(op->tid);
 
   if (to->is_homeless()) {
-    num_homeless_ops.inc();
+    num_homeless_ops++;
   }
 
   get_session(to);
@@ -3013,7 +3103,7 @@ void Objecter::_cancel_linger_op(Op *op)
   assert(!op->should_resend);
   if (op->onfinish) {
     delete op->onfinish;
-    num_in_flight.dec();
+    num_in_flight--;
   }
 
   _finish_op(op, 0);
@@ -3039,7 +3129,7 @@ void Objecter::_finish_op(Op *op, int r)
 
   assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end());
 
-  inflight_ops.dec();
+  inflight_ops--;
 
   op->put();
 }
@@ -3078,7 +3168,7 @@ MOSDOp *Objecter::_prepare_osd_op(Op *op)
   op->stamp = ceph::mono_clock::now();
 
   hobject_t hobj = op->target.get_hobj();
-  MOSDOp *m = new MOSDOp(client_inc.read(), op->tid,
+  MOSDOp *m = new MOSDOp(client_inc, op->tid,
                         hobj, op->target.actual_pgid,
                         osdmap->get_epoch(),
                         flags, op->features);
@@ -3090,9 +3180,10 @@ MOSDOp *Objecter::_prepare_osd_op(Op *op)
   m->ops = op->ops;
   m->set_mtime(op->mtime);
   m->set_retry_attempt(op->attempts++);
-  m->trace = op->trace;
-  if (!m->trace && cct->_conf->osdc_blkin_trace_all)
-    m->trace.init("objecter op", &trace_endpoint);
+
+  if (!op->trace.valid() && cct->_conf->osdc_blkin_trace_all) {
+    op->trace.init("op", &trace_endpoint);
+  }
 
   if (op->priority)
     m->set_priority(op->priority);
@@ -3104,7 +3195,11 @@ MOSDOp *Objecter::_prepare_osd_op(Op *op)
   }
 
   logger->inc(l_osdc_op_send);
-  logger->inc(l_osdc_op_send_bytes, m->get_data().length());
+  ssize_t sum = 0;
+  for (unsigned i = 0; i < m->ops.size(); i++) {
+    sum += m->ops[i].indata.length();
+  }
+  logger->inc(l_osdc_op_send_bytes, sum);
 
   return m;
 }
@@ -3115,9 +3210,9 @@ void Objecter::_send_op(Op *op, MOSDOp *m)
   // op->session->lock is locked
 
   // backoff?
-  hobject_t hoid = op->target.get_hobj();
   auto p = op->session->backoffs.find(op->target.actual_pgid);
   if (p != op->session->backoffs.end()) {
+    hobject_t hoid = op->target.get_hobj();
     auto q = p->second.lower_bound(hoid);
     if (q != p->second.begin()) {
       --q;
@@ -3177,6 +3272,9 @@ void Objecter::_send_op(Op *op, MOSDOp *m)
 
   m->set_tid(op->tid);
 
+  if (op->trace.valid()) {
+    m->trace.init("op msg", nullptr, &op->trace);
+  }
   op->session->con->send_message(m);
 }
 
@@ -3235,7 +3333,7 @@ void Objecter::unregister_op(Op *op)
   put_session(op->session);
   op->session = NULL;
 
-  inflight_ops.dec();
+  inflight_ops--;
 }
 
 /* This function DOES put the passed message before returning */
@@ -3247,7 +3345,7 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
   ceph_tid_t tid = m->get_tid();
 
   shunique_lock sul(rwlock, ceph::acquire_shared);
-  if (!initialized.read()) {
+  if (!initialized) {
     m->put();
     return;
   }
@@ -3285,12 +3383,13 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
                << " attempt " << m->get_retry_attempt()
                << dendl;
   Op *op = iter->second;
+  op->trace.event("osd op reply");
 
   if (retry_writes_after_first_reply && op->attempts == 1 &&
       (op->target.flags & CEPH_OSD_FLAG_WRITE)) {
     ldout(cct, 7) << "retrying write after first reply: " << tid << dendl;
     if (op->onfinish) {
-      num_in_flight.dec();
+      num_in_flight--;
     }
     _session_op_remove(s, op);
     sl.unlock();
@@ -3326,7 +3425,7 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
   if (m->is_redirect_reply()) {
     ldout(cct, 5) << " got redirect reply; redirecting" << dendl;
     if (op->onfinish)
-      num_in_flight.dec();
+      num_in_flight--;
     _session_op_remove(s, op);
     sl.unlock();
     put_session(s);
@@ -3336,7 +3435,7 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
     op->tid = 0;
     m->get_redirect().combine_with_locator(op->target.target_oloc,
                                           op->target.target_oid.name);
-    op->target.flags |= CEPH_OSD_FLAG_REDIRECTED;
+    op->target.flags |= (CEPH_OSD_FLAG_REDIRECTED | CEPH_OSD_FLAG_IGNORE_OVERLAY);
     _op_submit(op, sul, NULL);
     m->put();
     return;
@@ -3344,14 +3443,17 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
 
   if (rc == -EAGAIN) {
     ldout(cct, 7) << " got -EAGAIN, resubmitting" << dendl;
-
-    // new tid
-    s->ops.erase(op->tid);
-    op->tid = last_tid.inc();
-
-    _send_op(op);
+    if (op->onfinish)
+      num_in_flight--;
+    _session_op_remove(s, op);
     sl.unlock();
     put_session(s);
+
+    op->tid = 0;
+    op->target.flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
+                         CEPH_OSD_FLAG_LOCALIZE_READS);
+    op->target.pgid = pg_t();
+    _op_submit(op, sul, NULL);
     m->put();
     return;
   }
@@ -3398,10 +3500,10 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
     // set rval before running handlers so that handlers
     // can change it if e.g. decoding fails
     if (*pr)
-      **pr = ceph_to_host_errno(p->rval);
+      **pr = ceph_to_hostos_errno(p->rval);
     if (*ph) {
       ldout(cct, 10) << " op " << i << " handler " << *ph << dendl;
-      (*ph)->complete(ceph_to_host_errno(p->rval));
+      (*ph)->complete(ceph_to_hostos_errno(p->rval));
       *ph = NULL;
     }
   }
@@ -3410,7 +3512,7 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
   // only ever get back one (type of) ack ever.
 
   if (op->onfinish) {
-    num_in_flight.dec();
+    num_in_flight--;
     onfinish = op->onfinish;
     op->onfinish = NULL;
   }
@@ -3422,7 +3524,7 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
   ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl;
   _finish_op(op, 0);
 
-  ldout(cct, 5) << num_in_flight.read() << " in flight" << dendl;
+  ldout(cct, 5) << num_in_flight << " in flight" << dendl;
 
   // serialize completions
   if (completion_lock.mutex()) {
@@ -3446,7 +3548,7 @@ void Objecter::handle_osd_backoff(MOSDBackoff *m)
 {
   ldout(cct, 10) << __func__ << " " << *m << dendl;
   shunique_lock sul(rwlock, ceph::acquire_shared);
-  if (!initialized.read()) {
+  if (!initialized) {
     m->put();
     return;
   }
@@ -3741,7 +3843,7 @@ int Objecter::create_pool_snap(int64_t pool, string& snap_name,
   PoolOp *op = new PoolOp;
   if (!op)
     return -ENOMEM;
-  op->tid = last_tid.inc();
+  op->tid = ++last_tid;
   op->pool = pool;
   op->name = snap_name;
   op->onfinish = onfinish;
@@ -3774,7 +3876,7 @@ int Objecter::allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid,
   ldout(cct, 10) << "allocate_selfmanaged_snap; pool: " << pool << dendl;
   PoolOp *op = new PoolOp;
   if (!op) return -ENOMEM;
-  op->tid = last_tid.inc();
+  op->tid = ++last_tid;
   op->pool = pool;
   C_SelfmanagedSnap *fin = new C_SelfmanagedSnap(psnapid, onfinish);
   op->onfinish = fin;
@@ -3802,7 +3904,7 @@ int Objecter::delete_pool_snap(int64_t pool, string& snap_name,
   PoolOp *op = new PoolOp;
   if (!op)
     return -ENOMEM;
-  op->tid = last_tid.inc();
+  op->tid = ++last_tid;
   op->pool = pool;
   op->name = snap_name;
   op->onfinish = onfinish;
@@ -3822,7 +3924,7 @@ int Objecter::delete_selfmanaged_snap(int64_t pool, snapid_t snap,
                 << snap << dendl;
   PoolOp *op = new PoolOp;
   if (!op) return -ENOMEM;
-  op->tid = last_tid.inc();
+  op->tid = ++last_tid;
   op->pool = pool;
   op->onfinish = onfinish;
   op->pool_op = POOL_OP_DELETE_UNMANAGED_SNAP;
@@ -3846,7 +3948,7 @@ int Objecter::create_pool(string& name, Context *onfinish, uint64_t auid,
   PoolOp *op = new PoolOp;
   if (!op)
     return -ENOMEM;
-  op->tid = last_tid.inc();
+  op->tid = ++last_tid;
   op->pool = 0;
   op->name = name;
   op->onfinish = onfinish;
@@ -3888,7 +3990,7 @@ int Objecter::delete_pool(const string &pool_name, Context *onfinish)
 void Objecter::_do_delete_pool(int64_t pool, Context *onfinish)
 {
   PoolOp *op = new PoolOp;
-  op->tid = last_tid.inc();
+  op->tid = ++last_tid;
   op->pool = pool;
   op->name = "delete";
   op->onfinish = onfinish;
@@ -3909,7 +4011,7 @@ int Objecter::change_pool_auid(int64_t pool, Context *onfinish, uint64_t auid)
   ldout(cct, 10) << "change_pool_auid " << pool << " to " << auid << dendl;
   PoolOp *op = new PoolOp;
   if (!op) return -ENOMEM;
-  op->tid = last_tid.inc();
+  op->tid = ++last_tid;
   op->pool = pool;
   op->name = "change_pool_auid";
   op->onfinish = onfinish;
@@ -3961,7 +4063,7 @@ void Objecter::handle_pool_op_reply(MPoolOpReply *m)
 {
   FUNCTRACE();
   shunique_lock sul(rwlock, acquire_shared);
-  if (!initialized.read()) {
+  if (!initialized) {
     sul.unlock();
     m->put();
     return;
@@ -4024,7 +4126,7 @@ done:
 
 int Objecter::pool_op_cancel(ceph_tid_t tid, int r)
 {
-  assert(initialized.read());
+  assert(initialized);
 
   unique_lock wl(rwlock);
 
@@ -4066,7 +4168,7 @@ void Objecter::get_pool_stats(list<string>& pools,
   ldout(cct, 10) << "get_pool_stats " << pools << dendl;
 
   PoolStatOp *op = new PoolStatOp;
-  op->tid = last_tid.inc();
+  op->tid = ++last_tid;
   op->pools = pools;
   op->pool_stats = result;
   op->onfinish = onfinish;
@@ -4105,7 +4207,7 @@ void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m)
   ceph_tid_t tid = m->get_tid();
 
   unique_lock wl(rwlock);
-  if (!initialized.read()) {
+  if (!initialized) {
     m->put();
     return;
   }
@@ -4129,7 +4231,7 @@ void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m)
 
 int Objecter::pool_stat_op_cancel(ceph_tid_t tid, int r)
 {
-  assert(initialized.read());
+  assert(initialized);
 
   unique_lock wl(rwlock);
 
@@ -4161,14 +4263,17 @@ void Objecter::_finish_pool_stat_op(PoolStatOp *op, int r)
   delete op;
 }
 
-void Objecter::get_fs_stats(ceph_statfs& result, Context *onfinish)
+void Objecter::get_fs_stats(ceph_statfs& result,
+                           boost::optional<int64_t> data_pool,
+                           Context *onfinish)
 {
   ldout(cct, 10) << "get_fs_stats" << dendl;
   unique_lock l(rwlock);
 
   StatfsOp *op = new StatfsOp;
-  op->tid = last_tid.inc();
+  op->tid = ++last_tid;
   op->stats = &result;
+  op->data_pool = data_pool;
   op->onfinish = onfinish;
   if (mon_timeout > timespan(0)) {
     op->ontimeout = timer.add_event(mon_timeout,
@@ -4191,6 +4296,7 @@ void Objecter::_fs_stats_submit(StatfsOp *op)
 
   ldout(cct, 10) << "fs_stats_submit" << op->tid << dendl;
   monc->send_mon_message(new MStatfs(monc->get_fsid(), op->tid,
+                                    op->data_pool,
                                     last_seen_pgmap_version));
   op->last_submit = ceph::mono_clock::now();
 
@@ -4200,7 +4306,7 @@ void Objecter::_fs_stats_submit(StatfsOp *op)
 void Objecter::handle_fs_stats_reply(MStatfsReply *m)
 {
   unique_lock wl(rwlock);
-  if (!initialized.read()) {
+  if (!initialized) {
     m->put();
     return;
   }
@@ -4225,7 +4331,7 @@ void Objecter::handle_fs_stats_reply(MStatfsReply *m)
 
 int Objecter::statfs_op_cancel(ceph_tid_t tid, int r)
 {
-  assert(initialized.read());
+  assert(initialized);
 
   unique_lock wl(rwlock);
 
@@ -4294,7 +4400,7 @@ void Objecter::_sg_read_finish(vector<ObjectExtent>& extents,
 void Objecter::ms_handle_connect(Connection *con)
 {
   ldout(cct, 10) << "ms_handle_connect " << con << dendl;
-  if (!initialized.read())
+  if (!initialized)
     return;
 
   if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
@@ -4303,7 +4409,7 @@ void Objecter::ms_handle_connect(Connection *con)
 
 bool Objecter::ms_handle_reset(Connection *con)
 {
-  if (!initialized.read())
+  if (!initialized)
     return false;
   if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
     OSDSession *session = static_cast<OSDSession*>(con->get_priv());
@@ -4311,7 +4417,7 @@ bool Objecter::ms_handle_reset(Connection *con)
       ldout(cct, 1) << "ms_handle_reset " << con << " session " << session
                    << " osd." << session->osd << dendl;
       unique_lock wl(rwlock);
-      if (!initialized.read()) {
+      if (!initialized) {
        wl.unlock();
        return false;
       }
@@ -4354,7 +4460,7 @@ bool Objecter::ms_get_authorizer(int dest_type,
                                 AuthAuthorizer **authorizer,
                                 bool force_new)
 {
-  if (!initialized.read())
+  if (!initialized)
     return false;
   if (dest_type == CEPH_ENTITY_TYPE_MON)
     return true;
@@ -4390,7 +4496,7 @@ void Objecter::_dump_active(OSDSession *s)
 
 void Objecter::_dump_active()
 {
-  ldout(cct, 20) << "dump_active .. " << num_homeless_ops.read() << " homeless"
+  ldout(cct, 20) << "dump_active .. " << num_homeless_ops << " homeless"
                 << dendl;
   for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
        siter != osd_sessions.end(); ++siter) {
@@ -4632,7 +4738,7 @@ void Objecter::blacklist_self(bool set)
 void Objecter::handle_command_reply(MCommandReply *m)
 {
   unique_lock wl(rwlock);
-  if (!initialized.read()) {
+  if (!initialized) {
     m->put();
     return;
   }
@@ -4678,8 +4784,10 @@ void Objecter::handle_command_reply(MCommandReply *m)
 
   sl.unlock();
 
-
+  OSDSession::unique_lock sul(s->lock);
   _finish_command(c, m->r, m->rs);
+  sul.unlock();
+
   m->put();
   if (s)
     s->put();
@@ -4689,7 +4797,7 @@ void Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
 {
   shunique_lock sul(rwlock, ceph::acquire_unique);
 
-  ceph_tid_t tid = last_tid.inc();
+  ceph_tid_t tid = ++last_tid;
   ldout(cct, 10) << "_submit_command " << tid << " " << c->cmd << dendl;
   c->tid = tid;
 
@@ -4812,7 +4920,7 @@ void Objecter::_send_command(CommandOp *c)
 
 int Objecter::command_op_cancel(OSDSession *s, ceph_tid_t tid, int r)
 {
-  assert(initialized.read());
+  assert(initialized);
 
   unique_lock wl(rwlock);
 
@@ -4826,13 +4934,16 @@ int Objecter::command_op_cancel(OSDSession *s, ceph_tid_t tid, int r)
 
   CommandOp *op = it->second;
   _command_cancel_map_check(op);
+  OSDSession::unique_lock sl(op->session->lock);
   _finish_command(op, r, "");
+  sl.unlock();
   return 0;
 }
 
 void Objecter::_finish_command(CommandOp *c, int r, string rs)
 {
   // rwlock is locked unique
+  // session lock is locked
 
   ldout(cct, 10) << "_finish_command " << c->tid << " = " << r << " "
                 << rs << dendl;
@@ -4845,9 +4956,7 @@ void Objecter::_finish_command(CommandOp *c, int r, string rs)
     timer.cancel_event(c->ontimeout);
 
   OSDSession *s = c->session;
-  OSDSession::unique_lock sl(s->lock);
   _session_command_op_remove(c->session, c);
-  sl.unlock();
 
   c->put();
 
@@ -4868,7 +4977,7 @@ Objecter::~Objecter()
   delete osdmap;
 
   assert(homeless_session->get_nref() == 1);
-  assert(num_homeless_ops.read() == 0);
+  assert(num_homeless_ops == 0);
   homeless_session->put();
 
   assert(osd_sessions.empty());