]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/client/Client.cc
update sources to 12.2.10
[ceph.git] / ceph / src / client / Client.cc
index 6b34e4a330e43df4983fc48be64fd78045675a49..fc99ad53bef945acddac43cc971a78a4e17319bf 100644 (file)
@@ -83,6 +83,7 @@
 #include "Client.h"
 #include "Inode.h"
 #include "Dentry.h"
+#include "Delegation.h"
 #include "Dir.h"
 #include "ClientSnapRealm.h"
 #include "Fh.h"
@@ -235,10 +236,8 @@ Client::Client(Messenger *m, MonClient *mc, Objecter *objecter_)
     remount_cb(NULL),
     ino_invalidate_cb(NULL),
     dentry_invalidate_cb(NULL),
-    getgroups_cb(NULL),
     umask_cb(NULL),
     can_invalidate_dentries(false),
-    require_remount(false),
     async_ino_invalidator(m->cct),
     async_dentry_invalidator(m->cct),
     interrupt_finisher(m->cct),
@@ -251,9 +250,10 @@ Client::Client(Messenger *m, MonClient *mc, Objecter *objecter_)
     last_tid(0), oldest_tid(0), last_flush_tid(1),
     initialized(false),
     mounted(false), unmounting(false), blacklisted(false),
-    local_osd(-1), local_osd_epoch(0),
+    local_osd(-ENXIO), local_osd_epoch(0),
     unsafe_sync_write(0),
-    client_lock("Client::client_lock")
+    client_lock("Client::client_lock"),
+    deleg_timeout(0)
 {
   _reset_faked_inos();
   //
@@ -271,7 +271,6 @@ Client::Client(Messenger *m, MonClient *mc, Objecter *objecter_)
   if (cct->_conf->client_acl_type == "posix_acl")
     acl_type = POSIX_ACL;
 
-  lru.lru_set_max(cct->_conf->client_cache_size);
   lru.lru_set_midpoint(cct->_conf->client_cache_mid);
 
   // file handles
@@ -331,7 +330,6 @@ void Client::tear_down_cache()
   // *** FIXME ***
 
   // empty lru
-  lru.lru_set_max(0);
   trim_cache();
   assert(lru.lru_get_size() == 0);
 
@@ -450,6 +448,10 @@ void Client::dump_status(Formatter *f)
     f->dump_int("dentry_count", lru.lru_get_size());
     f->dump_int("dentry_pinned_count", lru.lru_get_num_pinned());
     f->dump_int("id", get_nodeid().v);
+    entity_inst_t inst(messenger->get_myname(), messenger->get_myaddr());
+    f->dump_object("inst", inst);
+    f->dump_stream("inst_str") << inst;
+    f->dump_stream("addr_str") << inst.addr;
     f->dump_int("inode_count", inode_map.size());
     f->dump_int("mds_epoch", mdsmap->get_epoch());
     f->dump_int("osd_epoch", osd_epoch);
@@ -600,12 +602,13 @@ void Client::shutdown()
 
 void Client::trim_cache(bool trim_kernel_dcache)
 {
-  ldout(cct, 20) << "trim_cache size " << lru.lru_get_size() << " max " << lru.lru_get_max() << dendl;
+  uint64_t max = cct->_conf->client_cache_size;
+  ldout(cct, 20) << "trim_cache size " << lru.lru_get_size() << " max " << max << dendl;
   unsigned last = 0;
   while (lru.lru_get_size() != last) {
     last = lru.lru_get_size();
 
-    if (lru.lru_get_size() <= lru.lru_get_max())  break;
+    if (!unmounting && lru.lru_get_size() <= max)  break;
 
     // trim!
     Dentry *dn = static_cast<Dentry*>(lru.lru_get_next_expire());
@@ -615,7 +618,7 @@ void Client::trim_cache(bool trim_kernel_dcache)
     trim_dentry(dn);
   }
 
-  if (trim_kernel_dcache && lru.lru_get_size() > lru.lru_get_max())
+  if (trim_kernel_dcache && lru.lru_get_size() > max)
     _invalidate_kernel_dcache();
 
   // hose root?
@@ -675,33 +678,11 @@ void Client::trim_dentry(Dentry *dn)
 }
 
 
-void Client::update_inode_file_bits(Inode *in,
-                                   uint64_t truncate_seq, uint64_t truncate_size,
-                                   uint64_t size, uint64_t change_attr,
-                                   uint64_t time_warp_seq, utime_t ctime,
-                                   utime_t mtime,
-                                   utime_t atime,
-                                   version_t inline_version,
-                                   bufferlist& inline_data,
-                                   int issued)
+void Client::update_inode_file_size(Inode *in, int issued, uint64_t size,
+                                   uint64_t truncate_seq, uint64_t truncate_size)
 {
-  bool warn = false;
-  ldout(cct, 10) << "update_inode_file_bits " << *in << " " << ccap_string(issued)
-          << " mtime " << mtime << dendl;
-  ldout(cct, 25) << "truncate_seq: mds " << truncate_seq <<  " local "
-          << in->truncate_seq << " time_warp_seq: mds " << time_warp_seq
-          << " local " << in->time_warp_seq << dendl;
   uint64_t prior_size = in->size;
 
-  if (inline_version > in->inline_version) {
-    in->inline_data = inline_data;
-    in->inline_version = inline_version;
-  }
-
-  /* always take a newer change attr */
-  if (change_attr > in->change_attr)
-    in->change_attr = change_attr;
-
   if (truncate_seq > in->truncate_seq ||
       (truncate_seq == in->truncate_seq && size > in->size)) {
     ldout(cct, 10) << "size " << in->size << " -> " << size << dendl;
@@ -737,7 +718,20 @@ void Client::update_inode_file_bits(Inode *in,
       ldout(cct, 0) << "Hmmm, truncate_seq && truncate_size changed on non-file inode!" << dendl;
     }
   }
-  
+}
+
+void Client::update_inode_file_time(Inode *in, int issued, uint64_t time_warp_seq,
+                                   utime_t ctime, utime_t mtime, utime_t atime)
+{
+  ldout(cct, 10) << __func__ << " " << *in << " " << ccap_string(issued)
+                << " ctime " << ctime << " mtime " << mtime << dendl;
+
+  if (time_warp_seq > in->time_warp_seq)
+    ldout(cct, 10) << " mds time_warp_seq " << time_warp_seq
+                  << " is higher than local time_warp_seq "
+                  << in->time_warp_seq << dendl;
+
+  int warn = false;
   // be careful with size, mtime, atime
   if (issued & (CEPH_CAP_FILE_EXCL|
                CEPH_CAP_FILE_WR|
@@ -748,9 +742,6 @@ void Client::update_inode_file_bits(Inode *in,
     if (ctime > in->ctime) 
       in->ctime = ctime;
     if (time_warp_seq > in->time_warp_seq) {
-      ldout(cct, 10) << "mds time_warp_seq " << time_warp_seq << " on inode " << *in
-              << " is higher than local time_warp_seq "
-              << in->time_warp_seq << dendl;
       //the mds updated times, so take those!
       in->mtime = mtime;
       in->atime = atime;
@@ -835,53 +826,59 @@ Inode * Client::add_update_inode(InodeStat *st, utime_t from,
   if (in->is_symlink())
     in->symlink = st->symlink;
 
-  if (was_new)
-    ldout(cct, 12) << "add_update_inode adding " << *in << " caps " << ccap_string(st->cap.caps) << dendl;
-
-  if (!st->cap.caps)
-    return in;   // as with readdir returning indoes in different snaprealms (no caps!)
-
   // only update inode if mds info is strictly newer, or it is the same and projected (odd).
-  bool updating_inode = false;
-  int issued = 0;
-  if (st->version == 0 ||
-      (in->version & ~1) < st->version) {
-    updating_inode = true;
+  bool new_version = false;
+  if (in->version == 0 ||
+      ((st->cap.flags & CEPH_CAP_FLAG_AUTH) &&
+       (in->version & ~1) < st->version))
+    new_version = true;
 
-    int implemented = 0;
-    issued = in->caps_issued(&implemented) | in->caps_dirty();
-    issued |= implemented;
+  int issued;
+  in->caps_issued(&issued);
+  issued |= in->caps_dirty();
+  int new_issued = ~issued & (int)st->cap.caps;
 
-    in->version = st->version;
+  if ((new_version || (new_issued & CEPH_CAP_AUTH_SHARED)) &&
+      !(issued & CEPH_CAP_AUTH_EXCL)) {
+    in->mode = st->mode;
+    in->uid = st->uid;
+    in->gid = st->gid;
+    in->btime = st->btime;
+  }
 
-    if ((issued & CEPH_CAP_AUTH_EXCL) == 0) {
-      in->mode = st->mode;
-      in->uid = st->uid;
-      in->gid = st->gid;
-      in->btime = st->btime;
-    }
+  if ((new_version || (new_issued & CEPH_CAP_LINK_SHARED)) &&
+      !(issued & CEPH_CAP_LINK_EXCL)) {
+    in->nlink = st->nlink;
+  }
 
-    if ((issued & CEPH_CAP_LINK_EXCL) == 0) {
-      in->nlink = st->nlink;
-    }
+  if (new_version || (new_issued & CEPH_CAP_ANY_RD)) {
+    update_inode_file_time(in, issued, st->time_warp_seq,
+                          st->ctime, st->mtime, st->atime);
+  }
 
-    in->dirstat = st->dirstat;
-    in->rstat = st->rstat;
-    in->quota = st->quota;
+  if (new_version ||
+      (new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) {
     in->layout = st->layout;
+    update_inode_file_size(in, issued, st->size, st->truncate_seq, st->truncate_size);
+  }
 
-    if (in->is_dir()) {
+  if (in->is_dir()) {
+    if (new_version || (new_issued & CEPH_CAP_FILE_SHARED)) {
+      in->dirstat = st->dirstat;
+    }
+    // dir_layout/rstat/quota are not tracked by capability, update them only if
+    // the inode stat is from auth mds
+    if (new_version || (st->cap.flags & CEPH_CAP_FLAG_AUTH)) {
       in->dir_layout = st->dir_layout;
       ldout(cct, 20) << " dir hash is " << (int)in->dir_layout.dl_dir_hash << dendl;
+      in->rstat = st->rstat;
+      in->quota = st->quota;
+    }
+    // move me if/when version reflects fragtree changes.
+    if (in->dirfragtree != st->dirfragtree) {
+      in->dirfragtree = st->dirfragtree;
+      _fragmap_remove_non_leaves(in);
     }
-
-    update_inode_file_bits(in, st->truncate_seq, st->truncate_size, st->size,
-                          st->change_attr, st->time_warp_seq, st->ctime,
-                          st->mtime, st->atime, st->inline_version,
-                          st->inline_data, issued);
-  } else if (st->inline_version > in->inline_version) {
-    in->inline_data = st->inline_data;
-    in->inline_version = st->inline_version;
   }
 
   if ((in->xattr_version  == 0 || !(issued & CEPH_CAP_XATTR_EXCL)) &&
@@ -892,42 +889,54 @@ Inode * Client::add_update_inode(InodeStat *st, utime_t from,
     in->xattr_version = st->xattr_version;
   }
 
-  // move me if/when version reflects fragtree changes.
-  if (in->dirfragtree != st->dirfragtree) {
-    in->dirfragtree = st->dirfragtree;
-    _fragmap_remove_non_leaves(in);
+  if (st->inline_version > in->inline_version) {
+    in->inline_data = st->inline_data;
+    in->inline_version = st->inline_version;
   }
 
+  /* always take a newer change attr */
+  if (st->change_attr > in->change_attr)
+    in->change_attr = st->change_attr;
+
+  if (st->version > in->version)
+    in->version = st->version;
+
+  if (was_new)
+    ldout(cct, 12) << __func__ << " adding " << *in << " caps " << ccap_string(st->cap.caps) << dendl;
+
+  if (!st->cap.caps)
+    return in;   // as with readdir returning indoes in different snaprealms (no caps!)
+
   if (in->snapid == CEPH_NOSNAP) {
     add_update_cap(in, session, st->cap.cap_id, st->cap.caps, st->cap.seq,
                   st->cap.mseq, inodeno_t(st->cap.realm), st->cap.flags,
                   request_perms);
-    if (in->auth_cap && in->auth_cap->session == session)
+    if (in->auth_cap && in->auth_cap->session == session) {
       in->max_size = st->max_size;
-  } else
-    in->snap_caps |= st->cap.caps;
-
-  // setting I_COMPLETE needs to happen after adding the cap
-  if (updating_inode &&
-      in->is_dir() &&
-      (st->cap.caps & CEPH_CAP_FILE_SHARED) &&
-      (issued & CEPH_CAP_FILE_EXCL) == 0 &&
-      in->dirstat.nfiles == 0 &&
-      in->dirstat.nsubdirs == 0) {
-    ldout(cct, 10) << " marking (I_COMPLETE|I_DIR_ORDERED) on empty dir " << *in << dendl;
-    in->flags |= I_COMPLETE | I_DIR_ORDERED;
-    if (in->dir) {
-      ldout(cct, 10) << " dir is open on empty dir " << in->ino << " with "
-                    << in->dir->dentries.size() << " entries, marking all dentries null" << dendl;
-      in->dir->readdir_cache.clear();
-      for (auto p = in->dir->dentries.begin();
-          p != in->dir->dentries.end();
-          ++p) {
-       unlink(p->second, true, true);  // keep dir, keep dentry
+      in->rstat = st->rstat;
+    }
+
+    // setting I_COMPLETE needs to happen after adding the cap
+    if (in->is_dir() &&
+       (st->cap.caps & CEPH_CAP_FILE_SHARED) &&
+       (issued & CEPH_CAP_FILE_EXCL) == 0 &&
+       in->dirstat.nfiles == 0 &&
+       in->dirstat.nsubdirs == 0) {
+      ldout(cct, 10) << " marking (I_COMPLETE|I_DIR_ORDERED) on empty dir " << *in << dendl;
+      in->flags |= I_COMPLETE | I_DIR_ORDERED;
+      if (in->dir) {
+       ldout(cct, 10) << " dir is open on empty dir " << in->ino << " with "
+                      << in->dir->dentries.size() << " entries, marking all dentries null" << dendl;
+       in->dir->readdir_cache.clear();
+       for (const auto& p : in->dir->dentries) {
+         unlink(p.second, true, true);  // keep dir, keep dentry
+       }
+       if (in->dir->dentries.empty())
+         close_dir(in->dir);
       }
-      if (in->dir->dentries.empty())
-       close_dir(in->dir);
     }
+  } else {
+    in->snap_caps |= st->cap.caps;
   }
 
   return in;
@@ -1456,6 +1465,10 @@ mds_rank_t Client::choose_target_mds(MetaRequest *req, Inode** phash_diri)
        mds = in->fragmap[fg];
        if (phash_diri)
          *phash_diri = in;
+      } else if (in->auth_cap) {
+       mds = in->auth_cap->session->mds_num;
+      }
+      if (mds >= 0) {
        ldout(cct, 10) << "choose_target_mds from dirfragtree hash" << dendl;
        goto out;
       }
@@ -1505,6 +1518,10 @@ void Client::connect_mds_targets(mds_rank_t mds)
 void Client::dump_mds_sessions(Formatter *f)
 {
   f->dump_int("id", get_nodeid().v);
+  entity_inst_t inst(messenger->get_myname(), messenger->get_myaddr());
+  f->dump_object("inst", inst);
+  f->dump_stream("inst_str") << inst;
+  f->dump_stream("addr_str") << inst.addr;
   f->open_array_section("sessions");
   for (map<mds_rank_t,MetaSession*>::const_iterator p = mds_sessions.begin(); p != mds_sessions.end(); ++p) {
     f->open_object_section("session");
@@ -2076,6 +2093,10 @@ void Client::handle_client_session(MClientSession *m)
     break;
 
   case CEPH_SESSION_STALE:
+    // invalidate session caps/leases
+    session->cap_gen++;
+    session->cap_ttl = ceph_clock_now();
+    session->cap_ttl -= 1;
     renew_caps(session);
     break;
 
@@ -2972,10 +2993,15 @@ Dentry* Client::link(Dir *dir, const string& name, Inode *in, Dentry *dn)
     dn->dir = dir;
     dir->dentries[dn->name] = dn;
     lru.lru_insert_mid(dn);    // mid or top?
+    if (!in)
+      dir->num_null_dentries++;
 
     ldout(cct, 15) << "link dir " << dir->parent_inode << " '" << name << "' to inode " << in
                   << " dn " << dn << " (new dn)" << dendl;
   } else {
+    assert(!dn->inode);
+    if (in)
+      dir->num_null_dentries--;
     ldout(cct, 15) << "link dir " << dir->parent_inode << " '" << name << "' to inode " << in
                   << " dn " << dn << " (old dn)" << dendl;
   }
@@ -3032,11 +3058,15 @@ void Client::unlink(Dentry *dn, bool keepdir, bool keepdentry)
 
   if (keepdentry) {
     dn->lease_mds = -1;
+    if (in)
+      dn->dir->num_null_dentries++;
   } else {
     ldout(cct, 15) << "unlink  removing '" << dn->name << "' dn " << dn << dendl;
 
     // unlink from dir
     dn->dir->dentries.erase(dn->name);
+    if (!in)
+      dn->dir->num_null_dentries--;
     if (dn->dir->is_empty() && !keepdir)
       close_dir(dn->dir);
     dn->dir = 0;
@@ -3232,7 +3262,7 @@ void Client::cap_delay_requeue(Inode *in)
   ldout(cct, 10) << "cap_delay_requeue on " << *in << dendl;
   in->hold_caps_until = ceph_clock_now();
   in->hold_caps_until += cct->_conf->client_caps_release_delay;
-  delayed_caps.push_back(&in->cap_item);
+  delayed_list.push_back(&in->delay_cap_item);
 }
 
 void Client::send_cap(Inode *in, MetaSession *session, Cap *cap,
@@ -3408,8 +3438,10 @@ void Client::check_caps(Inode *in, unsigned flags)
     return;   // guard if at end of func
 
   if ((revoking & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) &&
-      (used & CEPH_CAP_FILE_CACHE) && !(used & CEPH_CAP_FILE_BUFFER))
-    _release(in);
+      (used & CEPH_CAP_FILE_CACHE) && !(used & CEPH_CAP_FILE_BUFFER)) {
+    if (_release(in))
+      used &= ~CEPH_CAP_FILE_CACHE;
+  }
 
   if (!in->cap_snaps.empty())
     flush_snaps(in);
@@ -3743,8 +3775,11 @@ void Client::_invalidate_inode_cache(Inode *in)
   ldout(cct, 10) << "_invalidate_inode_cache " << *in << dendl;
 
   // invalidate our userspace inode cache
-  if (cct->_conf->client_oc)
+  if (cct->_conf->client_oc) {
     objectcacher->release_set(&in->oset);
+    if (!objectcacher->set_is_empty(&in->oset))
+      lderr(cct) << "failed to invalidate cache for " << *in << dendl;
+  }
 
   _schedule_invalidate_callback(in, 0, 0);
 }
@@ -3757,7 +3792,7 @@ void Client::_invalidate_inode_cache(Inode *in, int64_t off, int64_t len)
   if (cct->_conf->client_oc) {
     vector<ObjectExtent> ls;
     Striper::file_to_extents(cct, in->ino, &in->layout, off, len, in->truncate_size, ls);
-    objectcacher->discard_set(&in->oset, ls);
+    objectcacher->discard_writeback(&in->oset, ls, nullptr);
   }
 
   _schedule_invalidate_callback(in, off, len);
@@ -3784,7 +3819,7 @@ bool Client::_flush(Inode *in, Context *onfinish)
   }
 
   if (objecter->osdmap_pool_full(in->layout.pool_id)) {
-    ldout(cct, 1) << __func__ << ": FULL, purging for ENOSPC" << dendl;
+    ldout(cct, 8) << __func__ << ": FULL, purging for ENOSPC" << dendl;
     objectcacher->purge_set(&in->oset);
     if (onfinish) {
       onfinish->complete(-ENOSPC);
@@ -3896,7 +3931,6 @@ void Client::add_update_cap(Inode *in, MetaSession *mds_session, uint64_t cap_id
     cap->session = mds_session;
     cap->inode = in;
     cap->gen = mds_session->cap_gen;
-    cap_list.push_back(&in->cap_item);
   }
 
   check_cap_issue(in, cap, issued);
@@ -3915,11 +3949,12 @@ void Client::add_update_cap(Inode *in, MetaSession *mds_session, uint64_t cap_id
 
   unsigned old_caps = cap->issued;
   cap->cap_id = cap_id;
-  cap->issued |= issued;
+  cap->issued = issued;
   cap->implemented |= issued;
   cap->seq = seq;
   cap->issue_seq = seq;
   cap->mseq = mseq;
+  cap->gen = mds_session->cap_gen;
   cap->latest_perms = cap_perms;
   ldout(cct, 10) << "add_update_cap issued " << ccap_string(old_caps) << " -> " << ccap_string(cap->issued)
           << " from mds." << mds
@@ -4015,7 +4050,7 @@ void Client::remove_session_caps(MetaSession *s)
        in->flushing_cap_tids.clear();
       }
       in->flushing_caps = 0;
-      in->dirty_caps = 0;
+      in->mark_caps_clean();
       put_inode(in);
     }
   }
@@ -4023,22 +4058,46 @@ void Client::remove_session_caps(MetaSession *s)
   sync_cond.Signal();
 }
 
+int Client::_do_remount(bool retry_on_error)
+{
+  uint64_t max_retries = cct->_conf->get_val<uint64_t>("mds_max_retries_on_remount_failure");
+
+  errno = 0;
+  int r = remount_cb(callback_handle);
+  if (r == 0) {
+    retries_on_invalidate = 0;
+  } else {
+    int e = errno;
+    client_t whoami = get_nodeid();
+    if (r == -1) {
+      lderr(cct) <<
+          "failed to remount (to trim kernel dentries): "
+          "errno = " << e << " (" << strerror(e) << ")" << dendl;
+    } else {
+      lderr(cct) <<
+          "failed to remount (to trim kernel dentries): "
+          "return code = " << r << dendl;
+    }
+    bool should_abort =
+      (cct->_conf->get_val<bool>("client_die_on_failed_remount") ||
+       cct->_conf->get_val<bool>("client_die_on_failed_dentry_invalidate")) &&
+      !(retry_on_error && (++retries_on_invalidate < max_retries));
+    if (should_abort && !unmounting) {
+      lderr(cct) << "failed to remount for kernel dentry trimming; quitting!" << dendl;
+      ceph_abort();
+    }
+  }
+  return r;
+}
+
 class C_Client_Remount : public Context  {
 private:
   Client *client;
 public:
   explicit C_Client_Remount(Client *c) : client(c) {}
   void finish(int r) override {
-    assert (r == 0);
-    r = client->remount_cb(client->callback_handle);
-    if (r != 0) {
-      client_t whoami = client->get_nodeid();
-      lderr(client->cct) << "tried to remount (to trim kernel dentries) and got error "
-                        << r << dendl;
-      if (client->require_remount && !client->unmounting) {
-       assert(0 == "failed to remount for kernel dentry trimming");
-      }
-    }
+    assert(r == 0);
+    client->_do_remount(true);
   }
 };
 
@@ -4046,12 +4105,14 @@ void Client::_invalidate_kernel_dcache()
 {
   if (unmounting)
     return;
-  if (can_invalidate_dentries && dentry_invalidate_cb && root->dir) {
-    for (ceph::unordered_map<string, Dentry*>::iterator p = root->dir->dentries.begin();
-        p != root->dir->dentries.end();
-        ++p) {
-      if (p->second->inode)
-       _schedule_invalidate_dentry_callback(p->second, false);
+  if (can_invalidate_dentries) {
+    if (dentry_invalidate_cb && root->dir) {
+      for (ceph::unordered_map<string, Dentry*>::iterator p = root->dir->dentries.begin();
+         p != root->dir->dentries.end();
+         ++p) {
+       if (p->second->inode)
+        _schedule_invalidate_dentry_callback(p->second, false);
+      }
     }
   } else if (remount_cb) {
     // Hacky:
@@ -4060,18 +4121,45 @@ void Client::_invalidate_kernel_dcache()
   }
 }
 
-void Client::trim_caps(MetaSession *s, int max)
+void Client::_trim_negative_child_dentries(InodeRef& in)
+{
+  if (!in->is_dir())
+    return;
+
+  Dir* dir = in->dir;
+  if (dir && dir->dentries.size() == dir->num_null_dentries) {
+    for (auto p = dir->dentries.begin(); p != dir->dentries.end(); ) {
+      Dentry *dn = p->second;
+      ++p;
+      assert(!dn->inode);
+      if (dn->lru_is_expireable())
+       unlink(dn, true, false);  // keep dir, drop dentry
+    }
+    if (dir->dentries.empty()) {
+      close_dir(dir);
+    }
+  }
+
+  if (in->flags & I_SNAPDIR_OPEN) {
+    InodeRef snapdir = open_snapdir(in.get());
+    _trim_negative_child_dentries(snapdir);
+  }
+}
+
+void Client::trim_caps(MetaSession *s, uint64_t max)
 {
   mds_rank_t mds = s->mds_num;
-  int caps_size = s->caps.size();
+  size_t caps_size = s->caps.size();
   ldout(cct, 10) << "trim_caps mds." << mds << " max " << max 
     << " caps " << caps_size << dendl;
 
-  int trimmed = 0;
-  xlist<Cap*>::iterator p = s->caps.begin();
+  uint64_t trimmed = 0;
+  auto p = s->caps.begin();
+  std::set<Dentry *> to_trim; /* this avoids caps other than the one we're
+                               * looking at from getting deleted during traversal. */
   while ((caps_size - trimmed) > max && !p.end()) {
     Cap *cap = *p;
-    Inode *in = cap->inode;
+    InodeRef in(cap->inode);
 
     // Increment p early because it will be invalidated if cap
     // is deleted inside remove_cap
@@ -4081,16 +4169,16 @@ void Client::trim_caps(MetaSession *s, int max)
       int mine = cap->issued | cap->implemented;
       int oissued = in->auth_cap ? in->auth_cap->issued : 0;
       // disposable non-auth cap
-      if (!(get_caps_used(in) & ~oissued & mine)) {
+      if (!(get_caps_used(in.get()) & ~oissued & mine)) {
        ldout(cct, 20) << " removing unused, unneeded non-auth cap on " << *in << dendl;
-       remove_cap(cap, true);
+       cap = (remove_cap(cap, true), nullptr);
        trimmed++;
       }
     } else {
       ldout(cct, 20) << " trying to trim dentries for " << *in << dendl;
+      _trim_negative_child_dentries(in);
       bool all = true;
       set<Dentry*>::iterator q = in->dn_set.begin();
-      InodeRef tmp_ref(in);
       while (q != in->dn_set.end()) {
        Dentry *dn = *q++;
        if (dn->lru_is_expireable()) {
@@ -4101,7 +4189,8 @@ void Client::trim_caps(MetaSession *s, int max)
            // the end of this function.
            _schedule_invalidate_dentry_callback(dn, true);
          }
-         trim_dentry(dn);
+          ldout(cct, 20) << " queueing dentry for trimming: " << dn->name << dendl;
+          to_trim.insert(dn);
         } else {
           ldout(cct, 20) << "  not expirable: " << dn->name << dendl;
          all = false;
@@ -4113,8 +4202,14 @@ void Client::trim_caps(MetaSession *s, int max)
       }
     }
   }
+  ldout(cct, 20) << " trimming queued dentries: " << dendl;
+  for (const auto &dn : to_trim) {
+    trim_dentry(dn);
+  }
+  to_trim.clear();
 
-  if (s->caps.size() > max)
+  caps_size = s->caps.size();
+  if (caps_size > max)
     _invalidate_kernel_dcache();
 }
 
@@ -4128,15 +4223,6 @@ void Client::force_session_readonly(MetaSession *s)
   }
 }
 
-void Client::mark_caps_dirty(Inode *in, int caps)
-{
-  ldout(cct, 10) << "mark_caps_dirty " << *in << " " << ccap_string(in->dirty_caps) << " -> "
-          << ccap_string(in->dirty_caps | caps) << dendl;
-  if (caps && !in->caps_dirty())
-    in->get();
-  in->dirty_caps |= caps;
-}
-
 int Client::mark_caps_flushing(Inode *in, ceph_tid_t* ptid)
 {
   MetaSession *session = in->auth_cap->session;
@@ -4155,7 +4241,7 @@ int Client::mark_caps_flushing(Inode *in, ceph_tid_t* ptid)
   }
 
   in->flushing_caps |= flushing;
-  in->dirty_caps = 0;
+  in->mark_caps_clean();
  
   if (!in->flushing_cap_item.is_on_list())
     session->flushing_caps.push_back(&in->flushing_cap_item);
@@ -4191,20 +4277,20 @@ void Client::adjust_session_flushing_caps(Inode *in, MetaSession *old_s,  MetaSe
 void Client::flush_caps_sync()
 {
   ldout(cct, 10) << __func__ << dendl;
-  xlist<Inode*>::iterator p = delayed_caps.begin();
+  xlist<Inode*>::iterator p = delayed_list.begin();
   while (!p.end()) {
     unsigned flags = CHECK_CAPS_NODELAY;
     Inode *in = *p;
 
     ++p;
-    delayed_caps.pop_front();
-    if (p.end() && cap_list.empty())
+    delayed_list.pop_front();
+    if (p.end() && dirty_list.empty())
       flags |= CHECK_CAPS_SYNCHRONOUS;
     check_caps(in, flags);
   }
 
   // other caps, too
-  p = cap_list.begin();
+  p = dirty_list.begin();
   while (!p.end()) {
     unsigned flags = CHECK_CAPS_NODELAY;
     Inode *in = *p;
@@ -4771,7 +4857,7 @@ void Client::handle_cap_export(MetaSession *session, Inode *in, MClientCaps *m)
       MetaSession *tsession = _get_or_open_mds_session(peer_mds);
       if (in->caps.count(peer_mds)) {
        Cap *tcap = in->caps[peer_mds];
-       if (tcap->cap_id != m->peer.cap_id ||
+       if (tcap->cap_id == m->peer.cap_id &&
            ceph_seq_cmp(tcap->seq, m->peer.seq) < 0) {
          tcap->cap_id = m->peer.cap_id;
          tcap->seq = m->peer.seq - 1;
@@ -4810,13 +4896,11 @@ void Client::handle_cap_trunc(MetaSession *session, Inode *in, MClientCaps *m)
           << " size " << in->size << " -> " << m->get_size()
           << dendl;
   
-  int implemented = 0;
-  int issued = in->caps_issued(&implemented) | in->caps_dirty();
-  issued |= implemented;
-  update_inode_file_bits(in, m->get_truncate_seq(), m->get_truncate_size(),
-                        m->get_size(), m->get_change_attr(), m->get_time_warp_seq(),
-                        m->get_ctime(), m->get_mtime(), m->get_atime(),
-                         m->inline_version, m->inline_data, issued);
+  int issued;
+  in->caps_issued(&issued);
+  issued |= in->caps_dirty();
+  update_inode_file_size(in, issued, m->get_size(),
+                        m->get_truncate_seq(), m->get_truncate_size());
   m->put();
 }
 
@@ -5010,42 +5094,65 @@ void Client::handle_cap_grant(MetaSession *session, Inode *in, Cap *cap, MClient
                << " caps now " << ccap_string(new_caps)
                << " was " << ccap_string(old_caps) << dendl;
   cap->seq = m->get_seq();
-
-  in->layout = m->get_layout();
+  cap->gen = session->cap_gen;
 
   // update inode
-  int implemented = 0;
-  int issued = in->caps_issued(&implemented) | in->caps_dirty();
-  issued |= implemented;
+  int issued;
+  in->caps_issued(&issued);
+  issued |= in->caps_dirty();
 
-  if ((issued & CEPH_CAP_AUTH_EXCL) == 0) {
+  if ((new_caps & CEPH_CAP_AUTH_SHARED) &&
+      !(issued & CEPH_CAP_AUTH_EXCL)) {
     in->mode = m->head.mode;
     in->uid = m->head.uid;
     in->gid = m->head.gid;
     in->btime = m->btime;
   }
   bool deleted_inode = false;
-  if ((issued & CEPH_CAP_LINK_EXCL) == 0) {
+  if ((new_caps & CEPH_CAP_LINK_SHARED) &&
+      !(issued & CEPH_CAP_LINK_EXCL)) {
     in->nlink = m->head.nlink;
     if (in->nlink == 0 &&
        (new_caps & (CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL)))
       deleted_inode = true;
   }
-  if ((issued & CEPH_CAP_XATTR_EXCL) == 0 &&
+  if (!(issued & CEPH_CAP_XATTR_EXCL) &&
       m->xattrbl.length() &&
       m->head.xattr_version > in->xattr_version) {
     bufferlist::iterator p = m->xattrbl.begin();
     ::decode(in->xattrs, p);
     in->xattr_version = m->head.xattr_version;
   }
-  update_inode_file_bits(in, m->get_truncate_seq(), m->get_truncate_size(), m->get_size(),
-                        m->get_change_attr(), m->get_time_warp_seq(), m->get_ctime(),
-                        m->get_mtime(), m->get_atime(),
-                        m->inline_version, m->inline_data, issued);
+
+  if ((new_caps & CEPH_CAP_FILE_SHARED) && m->dirstat_is_valid()) {
+    in->dirstat.nfiles = m->get_nfiles();
+    in->dirstat.nsubdirs = m->get_nsubdirs();
+  }
+
+  if (new_caps & CEPH_CAP_ANY_RD) {
+    update_inode_file_time(in, issued, m->get_time_warp_seq(),
+                          m->get_ctime(), m->get_mtime(), m->get_atime());
+  }
+
+  if (new_caps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) {
+    in->layout = m->get_layout();
+    update_inode_file_size(in, issued, m->get_size(),
+                          m->get_truncate_seq(), m->get_truncate_size());
+  }
+
+  if (m->inline_version > in->inline_version) {
+    in->inline_data = m->inline_data;
+    in->inline_version = m->inline_version;
+  }
+
+  /* always take a newer change attr */
+  if (m->get_change_attr() > in->change_attr)
+    in->change_attr = m->get_change_attr();
 
   // max_size
   if (cap == in->auth_cap &&
-      m->get_max_size() != in->max_size) {
+      (new_caps & CEPH_CAP_ANY_FILE_WR) &&
+      (m->get_max_size() != in->max_size)) {
     ldout(cct, 10) << "max_size " << in->max_size << " -> " << m->get_max_size() << dendl;
     in->max_size = m->get_max_size();
     if (in->max_size > in->wanted_max_size) {
@@ -5061,22 +5168,28 @@ void Client::handle_cap_grant(MetaSession *session, Inode *in, Cap *cap, MClient
   check_cap_issue(in, cap, new_caps);
 
   // update caps
-  if (old_caps & ~new_caps) { 
-    ldout(cct, 10) << "  revocation of " << ccap_string(~new_caps & old_caps) << dendl;
+  int revoked = old_caps & ~new_caps;
+  if (revoked) {
+    ldout(cct, 10) << "  revocation of " << ccap_string(revoked) << dendl;
     cap->issued = new_caps;
     cap->implemented |= new_caps;
 
-    if (((used & ~new_caps) & CEPH_CAP_FILE_BUFFER)
-        && !_flush(in, new C_Client_FlushComplete(this, in))) {
+    // recall delegations if we're losing caps necessary for them
+    if (revoked & ceph_deleg_caps_for_type(CEPH_DELEGATION_RD))
+      in->recall_deleg(false);
+    else if (revoked & ceph_deleg_caps_for_type(CEPH_DELEGATION_WR))
+      in->recall_deleg(true);
+
+    if ((used & revoked & CEPH_CAP_FILE_BUFFER) &&
+       !_flush(in, new C_Client_FlushComplete(this, in))) {
       // waitin' for flush
-    } else if ((old_caps & ~new_caps) & CEPH_CAP_FILE_CACHE) {
+    } else if (revoked & CEPH_CAP_FILE_CACHE) {
       if (_release(in))
        check = true;
     } else {
       cap->wanted = 0; // don't let check_caps skip sending a response to MDS
       check = true;
     }
-
   } else if (old_caps == new_caps) {
     ldout(cct, 10) << "  caps unchanged at " << ccap_string(old_caps) << dendl;
   } else {
@@ -5111,63 +5224,6 @@ void Client::handle_cap_grant(MetaSession *session, Inode *in, Cap *cap, MClient
   m->put();
 }
 
-int Client::_getgrouplist(gid_t** sgids, uid_t uid, gid_t gid)
-{
-  // cppcheck-suppress variableScope
-  int sgid_count;
-  gid_t *sgid_buf;
-
-  if (getgroups_cb) {
-    sgid_count = getgroups_cb(callback_handle, &sgid_buf);
-    if (sgid_count > 0) {
-      *sgids = sgid_buf;
-      return sgid_count;
-    }
-  }
-
-#if HAVE_GETGROUPLIST
-  struct passwd *pw;
-  pw = getpwuid(uid);
-  if (pw == NULL) {
-    ldout(cct, 3) << "getting user entry failed" << dendl;
-    return -errno;
-  }
-  //use PAM to get the group list
-  // initial number of group entries, defaults to posix standard of 16
-  // PAM implementations may provide more than 16 groups....
-  sgid_count = 16;
-  sgid_buf = (gid_t*)malloc(sgid_count * sizeof(gid_t));
-  if (sgid_buf == NULL) {
-    ldout(cct, 3) << "allocating group memory failed" << dendl;
-    return -ENOMEM;
-  }
-
-  while (1) {
-#if defined(__APPLE__)
-    if (getgrouplist(pw->pw_name, gid, (int*)sgid_buf, &sgid_count) == -1) {
-#else
-    if (getgrouplist(pw->pw_name, gid, sgid_buf, &sgid_count) == -1) {
-#endif
-      // we need to resize the group list and try again
-      void *_realloc = NULL;
-      if ((_realloc = realloc(sgid_buf, sgid_count * sizeof(gid_t))) == NULL) {
-       ldout(cct, 3) << "allocating group memory failed" << dendl;
-       free(sgid_buf);
-       return -ENOMEM;
-      }
-      sgid_buf = (gid_t*)_realloc;
-      continue;
-    }
-    // list was successfully retrieved
-    break;
-  }
-  *sgids = sgid_buf;
-  return sgid_count;
-#else
-  return 0;
-#endif
-}
-
 int Client::inode_permission(Inode *in, const UserPerm& perms, unsigned want)
 {
   if (perms.uid() == 0)
@@ -5200,7 +5256,7 @@ int Client::xattr_permission(Inode *in, const char *name, unsigned want,
     r = inode_permission(in, perms, want);
   }
 out:
-  ldout(cct, 3) << __func__ << " " << in << " = " << r <<  dendl;
+  ldout(cct, 5) << __func__ << " " << in << " = " << r <<  dendl;
   return r;
 }
 
@@ -5212,7 +5268,7 @@ ostream& operator<<(ostream &out, const UserPerm& perm) {
 int Client::may_setattr(Inode *in, struct ceph_statx *stx, int mask,
                        const UserPerm& perms)
 {
-  ldout(cct, 20) << __func__ << *in << "; " << perms << dendl;
+  ldout(cct, 20) << __func__ << " " << *in << "; " << perms << dendl;
   int r = _getattr_for_perm(in, perms);
   if (r < 0)
     goto out;
@@ -5268,7 +5324,7 @@ out:
 
 int Client::may_open(Inode *in, int flags, const UserPerm& perms)
 {
-  ldout(cct, 20) << __func__ << *in << "; " << perms << dendl;
+  ldout(cct, 20) << __func__ << " " << *in << "; " << perms << dendl;
   unsigned want = 0;
 
   if ((flags & O_ACCMODE) == O_WRONLY)
@@ -5305,7 +5361,7 @@ out:
 
 int Client::may_lookup(Inode *dir, const UserPerm& perms)
 {
-  ldout(cct, 20) << __func__ << *dir << "; " << perms << dendl;
+  ldout(cct, 20) << __func__ << " " << *dir << "; " << perms << dendl;
   int r = _getattr_for_perm(dir, perms);
   if (r < 0)
     goto out;
@@ -5318,7 +5374,7 @@ out:
 
 int Client::may_create(Inode *dir, const UserPerm& perms)
 {
-  ldout(cct, 20) << __func__ << *dir << "; " << perms << dendl;
+  ldout(cct, 20) << __func__ << " " << *dir << "; " << perms << dendl;
   int r = _getattr_for_perm(dir, perms);
   if (r < 0)
     goto out;
@@ -5331,7 +5387,7 @@ out:
 
 int Client::may_delete(Inode *dir, const char *name, const UserPerm& perms)
 {
-  ldout(cct, 20) << __func__ << *dir << "; " << "; name " << name << "; " << perms << dendl;
+  ldout(cct, 20) << __func__ << " " << *dir << "; " << "; name " << name << "; " << perms << dendl;
   int r = _getattr_for_perm(dir, perms);
   if (r < 0)
     goto out;
@@ -5356,7 +5412,7 @@ out:
 
 int Client::may_hardlink(Inode *in, const UserPerm& perms)
 {
-  ldout(cct, 20) << __func__ << *in << "; " << perms << dendl;
+  ldout(cct, 20) << __func__ << " " << *in << "; " << perms << dendl;
   int r = _getattr_for_perm(in, perms);
   if (r < 0)
     goto out;
@@ -5562,7 +5618,8 @@ int Client::mds_command(
 {
   Mutex::Locker lock(client_lock);
 
-  assert(initialized);
+  if (!initialized)
+    return -ENOTCONN;
 
   int r;
   r = authenticate();
@@ -5676,6 +5733,8 @@ int Client::mount(const std::string &mount_root, const UserPerm& perms,
     return 0;
   }
 
+  unmounting = false;
+
   int r = authenticate();
   if (r < 0) {
     lderr(cct) << "authentication failed: " << cpp_strerror(r) << dendl;
@@ -5821,15 +5880,16 @@ void Client::flush_mdlog(MetaSession *session)
 }
 
 
-void Client::unmount()
+void Client::_unmount()
 {
-  Mutex::Locker lock(client_lock);
-
-  assert(mounted);  // caller is confused?
+  if (unmounting)
+    return;
 
   ldout(cct, 2) << "unmounting" << dendl;
   unmounting = true;
 
+  deleg_timeout = 0;
+
   flush_mdlog_sync(); // flush the mdlog for pending requests, if any
   while (!mds_requests.empty()) {
     ldout(cct, 10) << "waiting on " << mds_requests.size() << " requests" << dendl;
@@ -5913,7 +5973,6 @@ void Client::unmount()
   wait_sync_caps(last_flush_tid);
 
   // empty lru cache
-  lru.lru_set_max(0);
   trim_cache();
 
   while (lru.lru_get_size() > 0 ||
@@ -5944,18 +6003,11 @@ void Client::unmount()
   ldout(cct, 2) << "unmounted." << dendl;
 }
 
-
-
-class C_C_Tick : public Context {
-  Client *client;
-public:
-  explicit C_C_Tick(Client *c) : client(c) {}
-  void finish(int r) override {
-    // Called back via Timer, which takes client_lock for us
-    assert(client->client_lock.is_locked_by_me());
-    client->tick();
-  }
-};
+void Client::unmount()
+{
+  Mutex::Locker lock(client_lock);
+  _unmount();
+}
 
 void Client::flush_cap_releases()
 {
@@ -5985,9 +6037,13 @@ void Client::tick()
   }
 
   ldout(cct, 21) << "tick" << dendl;
-  tick_event = new C_C_Tick(this);
-  timer.add_event_after(cct->_conf->client_tick_interval, tick_event);
-
+  tick_event = timer.add_event_after(
+    cct->_conf->client_tick_interval,
+    new FunctionContext([this](int) {
+       // Called back via Timer, which takes client_lock for us
+       assert(client_lock.is_locked_by_me());
+       tick();
+      }));
   utime_t now = ceph_clock_now();
 
   if (!mounted && !mds_requests.empty()) {
@@ -6016,14 +6072,13 @@ void Client::tick()
   }
 
   // delayed caps
-  xlist<Inode*>::iterator p = delayed_caps.begin();
+  xlist<Inode*>::iterator p = delayed_list.begin();
   while (!p.end()) {
     Inode *in = *p;
     ++p;
     if (in->hold_caps_until > now)
       break;
-    delayed_caps.pop_front();
-    cap_list.push_back(&in->cap_item);
+    delayed_list.pop_front();
     check_caps(in, CHECK_CAPS_NODELAY);
   }
 
@@ -6120,7 +6175,7 @@ int Client::_lookup(Inode *dir, const string& dname, int mask, InodeRef *target,
             << " seq " << dn->lease_seq
             << dendl;
 
-    if (!dn->inode || dn->inode->caps_issued_mask(mask)) {
+    if (!dn->inode || dn->inode->caps_issued_mask(mask, true)) {
       // is dn lease valid?
       utime_t now = ceph_clock_now();
       if (dn->lease_mds >= 0 &&
@@ -6138,9 +6193,9 @@ int Client::_lookup(Inode *dir, const string& dname, int mask, InodeRef *target,
                       << " vs lease_gen " << dn->lease_gen << dendl;
       }
       // dir lease?
-      if (dir->caps_issued_mask(CEPH_CAP_FILE_SHARED)) {
+      if (dir->caps_issued_mask(CEPH_CAP_FILE_SHARED, true)) {
        if (dn->cap_shared_gen == dir->shared_gen &&
-           (!dn->inode || dn->inode->caps_issued_mask(mask)))
+           (!dn->inode || dn->inode->caps_issued_mask(mask, true)))
              goto hit_dn;
        if (!dn->inode && (dir->flags & I_COMPLETE)) {
          ldout(cct, 10) << "_lookup concluded ENOENT locally for "
@@ -6153,7 +6208,7 @@ int Client::_lookup(Inode *dir, const string& dname, int mask, InodeRef *target,
     }
   } else {
     // can we conclude ENOENT locally?
-    if (dir->caps_issued_mask(CEPH_CAP_FILE_SHARED) &&
+    if (dir->caps_issued_mask(CEPH_CAP_FILE_SHARED, true) &&
        (dir->flags & I_COMPLETE)) {
       ldout(cct, 10) << "_lookup concluded ENOENT locally for " << *dir << " dn '" << dname << "'" << dendl;
       return -ENOENT;
@@ -6303,6 +6358,9 @@ int Client::link(const char *relexisting, const char *relpath, const UserPerm& p
   tout(cct) << relexisting << std::endl;
   tout(cct) << relpath << std::endl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   filepath existing(relexisting);
 
   InodeRef in, dir;
@@ -6342,6 +6400,9 @@ int Client::unlink(const char *relpath, const UserPerm& perm)
   tout(cct) << "unlink" << std::endl;
   tout(cct) << relpath << std::endl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   if (std::string(relpath) == "/")
     return -EISDIR;
 
@@ -6367,6 +6428,9 @@ int Client::rename(const char *relfrom, const char *relto, const UserPerm& perm)
   tout(cct) << relfrom << std::endl;
   tout(cct) << relto << std::endl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   if (std::string(relfrom) == "/" || std::string(relto) == "/")
     return -EBUSY;
 
@@ -6408,6 +6472,9 @@ int Client::mkdir(const char *relpath, mode_t mode, const UserPerm& perm)
   tout(cct) << mode << std::endl;
   ldout(cct, 10) << "mkdir: " << relpath << dendl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   if (std::string(relpath) == "/")
     return -EEXIST;
 
@@ -6434,6 +6501,9 @@ int Client::mkdirs(const char *relpath, mode_t mode, const UserPerm& perms)
   tout(cct) << relpath << std::endl;
   tout(cct) << mode << std::endl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   //get through existing parts of path
   filepath path(relpath);
   unsigned int i;
@@ -6486,6 +6556,9 @@ int Client::rmdir(const char *relpath, const UserPerm& perms)
   tout(cct) << "rmdir" << std::endl;
   tout(cct) << relpath << std::endl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   if (std::string(relpath) == "/")
     return -EBUSY;
 
@@ -6512,6 +6585,9 @@ int Client::mknod(const char *relpath, mode_t mode, const UserPerm& perms, dev_t
   tout(cct) << mode << std::endl;
   tout(cct) << rdev << std::endl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   if (std::string(relpath) == "/")
     return -EEXIST;
 
@@ -6539,6 +6615,9 @@ int Client::symlink(const char *target, const char *relpath, const UserPerm& per
   tout(cct) << target << std::endl;
   tout(cct) << relpath << std::endl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   if (std::string(relpath) == "/")
     return -EEXIST;
 
@@ -6563,6 +6642,9 @@ int Client::readlink(const char *relpath, char *buf, loff_t size, const UserPerm
   tout(cct) << "readlink" << std::endl;
   tout(cct) << relpath << std::endl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   filepath path(relpath);
   InodeRef in;
   int r = path_walk(path, &in, perms, false);
@@ -6590,7 +6672,7 @@ int Client::_readlink(Inode *in, char *buf, size_t size)
 
 int Client::_getattr(Inode *in, int mask, const UserPerm& perms, bool force)
 {
-  bool yes = in->caps_issued_mask(mask);
+  bool yes = in->caps_issued_mask(mask, true);
 
   ldout(cct, 10) << "_getattr mask " << ccap_string(mask) << " issued=" << yes << dendl;
   if (yes && !force)
@@ -6655,11 +6737,11 @@ int Client::_do_setattr(Inode *in, struct ceph_statx *stx, int mask,
     in->cap_dirtier_uid = perms.uid();
     in->cap_dirtier_gid = perms.gid();
     if (issued & CEPH_CAP_AUTH_EXCL)
-      mark_caps_dirty(in, CEPH_CAP_AUTH_EXCL);
+      in->mark_caps_dirty(CEPH_CAP_AUTH_EXCL);
     else if (issued & CEPH_CAP_FILE_EXCL)
-      mark_caps_dirty(in, CEPH_CAP_FILE_EXCL);
+      in->mark_caps_dirty(CEPH_CAP_FILE_EXCL);
     else if (issued & CEPH_CAP_XATTR_EXCL)
-      mark_caps_dirty(in, CEPH_CAP_XATTR_EXCL);
+      in->mark_caps_dirty(CEPH_CAP_XATTR_EXCL);
     else
       mask |= CEPH_SETATTR_CTIME;
   }
@@ -6674,7 +6756,7 @@ int Client::_do_setattr(Inode *in, struct ceph_statx *stx, int mask,
       in->cap_dirtier_uid = perms.uid();
       in->cap_dirtier_gid = perms.gid();
       in->uid = stx->stx_uid;
-      mark_caps_dirty(in, CEPH_CAP_AUTH_EXCL);
+      in->mark_caps_dirty(CEPH_CAP_AUTH_EXCL);
       mask &= ~CEPH_SETATTR_UID;
       kill_sguid = true;
       ldout(cct,10) << "changing uid to " << stx->stx_uid << dendl;
@@ -6684,7 +6766,7 @@ int Client::_do_setattr(Inode *in, struct ceph_statx *stx, int mask,
       in->cap_dirtier_uid = perms.uid();
       in->cap_dirtier_gid = perms.gid();
       in->gid = stx->stx_gid;
-      mark_caps_dirty(in, CEPH_CAP_AUTH_EXCL);
+      in->mark_caps_dirty(CEPH_CAP_AUTH_EXCL);
       mask &= ~CEPH_SETATTR_GID;
       kill_sguid = true;
       ldout(cct,10) << "changing gid to " << stx->stx_gid << dendl;
@@ -6695,15 +6777,13 @@ int Client::_do_setattr(Inode *in, struct ceph_statx *stx, int mask,
       in->cap_dirtier_uid = perms.uid();
       in->cap_dirtier_gid = perms.gid();
       in->mode = (in->mode & ~07777) | (stx->stx_mode & 07777);
-      mark_caps_dirty(in, CEPH_CAP_AUTH_EXCL);
+      in->mark_caps_dirty(CEPH_CAP_AUTH_EXCL);
       mask &= ~CEPH_SETATTR_MODE;
       ldout(cct,10) << "changing mode to " << stx->stx_mode << dendl;
-    } else if (kill_sguid && S_ISREG(in->mode)) {
+    } else if (kill_sguid && S_ISREG(in->mode) && (in->mode & (S_IXUSR|S_IXGRP|S_IXOTH))) {
       /* Must squash the any setuid/setgid bits with an ownership change */
-      in->mode &= ~S_ISUID;
-      if ((in->mode & (S_ISGID|S_IXGRP)) == (S_ISGID|S_IXGRP))
-       in->mode &= ~S_ISGID;
-      mark_caps_dirty(in, CEPH_CAP_AUTH_EXCL);
+      in->mode &= ~(S_ISUID|S_ISGID);
+      in->mark_caps_dirty(CEPH_CAP_AUTH_EXCL);
     }
 
     if (mask & CEPH_SETATTR_BTIME) {
@@ -6711,7 +6791,7 @@ int Client::_do_setattr(Inode *in, struct ceph_statx *stx, int mask,
       in->cap_dirtier_uid = perms.uid();
       in->cap_dirtier_gid = perms.gid();
       in->btime = utime_t(stx->stx_btime);
-      mark_caps_dirty(in, CEPH_CAP_AUTH_EXCL);
+      in->mark_caps_dirty(CEPH_CAP_AUTH_EXCL);
       mask &= ~CEPH_SETATTR_BTIME;
       ldout(cct,10) << "changing btime to " << in->btime << dendl;
     }
@@ -6730,7 +6810,7 @@ int Client::_do_setattr(Inode *in, struct ceph_statx *stx, int mask,
       in->cap_dirtier_uid = perms.uid();
       in->cap_dirtier_gid = perms.gid();
       in->time_warp_seq++;
-      mark_caps_dirty(in, CEPH_CAP_FILE_EXCL);
+      in->mark_caps_dirty(CEPH_CAP_FILE_EXCL);
       mask &= ~(CEPH_SETATTR_MTIME|CEPH_SETATTR_ATIME);
     }
   }
@@ -6772,7 +6852,7 @@ force_request:
   }
   if (mask & CEPH_SETATTR_MTIME) {
     req->head.args.setattr.mtime = utime_t(stx->stx_mtime);
-    req->inode_drop |= CEPH_CAP_AUTH_SHARED | CEPH_CAP_FILE_RD |
+    req->inode_drop |= CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_RD |
       CEPH_CAP_FILE_WR;
   }
   if (mask & CEPH_SETATTR_ATIME) {
@@ -6789,7 +6869,7 @@ force_request:
       ldout(cct,10) << "unable to set size to " << stx->stx_size << ". Too large!" << dendl;
       return -EFBIG;
     }
-    req->inode_drop |= CEPH_CAP_AUTH_SHARED | CEPH_CAP_FILE_RD |
+    req->inode_drop |= CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_RD |
       CEPH_CAP_FILE_WR;
   }
   req->head.args.setattr.mask = mask;
@@ -6845,6 +6925,14 @@ int Client::_setattr(InodeRef &in, struct stat *attr, int mask,
 
   stat_to_statx(attr, &stx);
   mask &= ~CEPH_SETATTR_BTIME;
+
+  if ((mask & CEPH_SETATTR_UID) && attr->st_uid == static_cast<uid_t>(-1)) {
+    mask &= ~CEPH_SETATTR_UID;
+  }
+  if ((mask & CEPH_SETATTR_GID) && attr->st_gid == static_cast<uid_t>(-1)) {
+    mask &= ~CEPH_SETATTR_GID;
+  }
+
   return _setattrx(in, &stx, mask, perms);
 }
 
@@ -6856,6 +6944,9 @@ int Client::setattr(const char *relpath, struct stat *attr, int mask,
   tout(cct) << relpath << std::endl;
   tout(cct) << mask  << std::endl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   filepath path(relpath);
   InodeRef in;
   int r = path_walk(path, &in, perms);
@@ -6872,6 +6963,9 @@ int Client::setattrx(const char *relpath, struct ceph_statx *stx, int mask,
   tout(cct) << relpath << std::endl;
   tout(cct) << mask  << std::endl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   filepath path(relpath);
   InodeRef in;
   int r = path_walk(path, &in, perms, !(flags & AT_SYMLINK_NOFOLLOW));
@@ -6887,6 +6981,9 @@ int Client::fsetattr(int fd, struct stat *attr, int mask, const UserPerm& perms)
   tout(cct) << fd << std::endl;
   tout(cct) << mask  << std::endl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   Fh *f = get_filehandle(fd);
   if (!f)
     return -EBADF;
@@ -6904,6 +7001,9 @@ int Client::fsetattrx(int fd, struct ceph_statx *stx, int mask, const UserPerm&
   tout(cct) << fd << std::endl;
   tout(cct) << mask  << std::endl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   Fh *f = get_filehandle(fd);
   if (!f)
     return -EBADF;
@@ -6921,6 +7021,10 @@ int Client::stat(const char *relpath, struct stat *stbuf, const UserPerm& perms,
   Mutex::Locker lock(client_lock);
   tout(cct) << "stat" << std::endl;
   tout(cct) << relpath << std::endl;
+
+  if (unmounting)
+    return -ENOTCONN;
+
   filepath path(relpath);
   InodeRef in;
   int r = path_walk(path, &in, perms, true, mask);
@@ -6966,6 +7070,10 @@ int Client::statx(const char *relpath, struct ceph_statx *stx,
   Mutex::Locker lock(client_lock);
   tout(cct) << "statx" << std::endl;
   tout(cct) << relpath << std::endl;
+
+  if (unmounting)
+    return -ENOTCONN;
+
   filepath path(relpath);
   InodeRef in;
 
@@ -6993,6 +7101,10 @@ int Client::lstat(const char *relpath, struct stat *stbuf,
   Mutex::Locker lock(client_lock);
   tout(cct) << "lstat" << std::endl;
   tout(cct) << relpath << std::endl;
+
+  if (unmounting)
+    return -ENOTCONN;
+
   filepath path(relpath);
   InodeRef in;
   // don't follow symlinks
@@ -7022,7 +7134,22 @@ int Client::fill_stat(Inode *in, struct stat *st, frag_info_t *dirstat, nest_inf
   st->st_dev = in->snapid;
   st->st_mode = in->mode;
   st->st_rdev = in->rdev;
-  st->st_nlink = in->nlink;
+  if (in->is_dir()) {
+    switch (in->nlink) {
+      case 0:
+        st->st_nlink = 0; /* dir is unlinked */
+        break;
+      case 1:
+        st->st_nlink = 1 /* parent dentry */
+                       + 1 /* <dir>/. */
+                       + in->dirstat.nsubdirs; /* include <dir>/. self-reference */
+        break;
+      default:
+        ceph_abort();
+    }
+  } else {
+    st->st_nlink = in->nlink;
+  }
   st->st_uid = in->uid;
   st->st_gid = in->gid;
   if (in->ctime > in->mtime) {
@@ -7089,7 +7216,22 @@ void Client::fill_statx(Inode *in, unsigned int mask, struct ceph_statx *stx)
   }
 
   if (mask & CEPH_CAP_LINK_SHARED) {
-    stx->stx_nlink = in->nlink;
+    if (in->is_dir()) {
+      switch (in->nlink) {
+        case 0:
+          stx->stx_nlink = 0; /* dir is unlinked */
+          break;
+        case 1:
+          stx->stx_nlink = 1 /* parent dentry */
+                           + 1 /* <dir>/. */
+                           + in->dirstat.nsubdirs; /* include <dir>/. self-reference */
+          break;
+        default:
+          ceph_abort();
+      }
+    } else {
+      stx->stx_nlink = in->nlink;
+    }
     stx->stx_mask |= CEPH_STATX_NLINK;
   }
 
@@ -7135,6 +7277,10 @@ int Client::chmod(const char *relpath, mode_t mode, const UserPerm& perms)
   tout(cct) << "chmod" << std::endl;
   tout(cct) << relpath << std::endl;
   tout(cct) << mode << std::endl;
+
+  if (unmounting)
+    return -ENOTCONN;
+
   filepath path(relpath);
   InodeRef in;
   int r = path_walk(path, &in, perms);
@@ -7151,6 +7297,10 @@ int Client::fchmod(int fd, mode_t mode, const UserPerm& perms)
   tout(cct) << "fchmod" << std::endl;
   tout(cct) << fd << std::endl;
   tout(cct) << mode << std::endl;
+
+  if (unmounting)
+    return -ENOTCONN;
+
   Fh *f = get_filehandle(fd);
   if (!f)
     return -EBADF;
@@ -7169,6 +7319,10 @@ int Client::lchmod(const char *relpath, mode_t mode, const UserPerm& perms)
   tout(cct) << "lchmod" << std::endl;
   tout(cct) << relpath << std::endl;
   tout(cct) << mode << std::endl;
+
+  if (unmounting)
+    return -ENOTCONN;
+
   filepath path(relpath);
   InodeRef in;
   // don't follow symlinks
@@ -7188,6 +7342,10 @@ int Client::chown(const char *relpath, uid_t new_uid, gid_t new_gid,
   tout(cct) << relpath << std::endl;
   tout(cct) << new_uid << std::endl;
   tout(cct) << new_gid << std::endl;
+
+  if (unmounting)
+    return -ENOTCONN;
+
   filepath path(relpath);
   InodeRef in;
   int r = path_walk(path, &in, perms);
@@ -7196,10 +7354,7 @@ int Client::chown(const char *relpath, uid_t new_uid, gid_t new_gid,
   struct stat attr;
   attr.st_uid = new_uid;
   attr.st_gid = new_gid;
-  int mask = 0;
-  if (new_uid != static_cast<uid_t>(-1)) mask |= CEPH_SETATTR_UID;
-  if (new_gid != static_cast<gid_t>(-1)) mask |= CEPH_SETATTR_GID;
-  return _setattr(in, &attr, mask, perms);
+  return _setattr(in, &attr, CEPH_SETATTR_UID|CEPH_SETATTR_GID, perms);
 }
 
 int Client::fchown(int fd, uid_t new_uid, gid_t new_gid, const UserPerm& perms)
@@ -7209,6 +7364,10 @@ int Client::fchown(int fd, uid_t new_uid, gid_t new_gid, const UserPerm& perms)
   tout(cct) << fd << std::endl;
   tout(cct) << new_uid << std::endl;
   tout(cct) << new_gid << std::endl;
+
+  if (unmounting)
+    return -ENOTCONN;
+
   Fh *f = get_filehandle(fd);
   if (!f)
     return -EBADF;
@@ -7233,6 +7392,10 @@ int Client::lchown(const char *relpath, uid_t new_uid, gid_t new_gid,
   tout(cct) << relpath << std::endl;
   tout(cct) << new_uid << std::endl;
   tout(cct) << new_gid << std::endl;
+
+  if (unmounting)
+    return -ENOTCONN;
+
   filepath path(relpath);
   InodeRef in;
   // don't follow symlinks
@@ -7256,6 +7419,10 @@ int Client::utime(const char *relpath, struct utimbuf *buf,
   tout(cct) << relpath << std::endl;
   tout(cct) << buf->modtime << std::endl;
   tout(cct) << buf->actime << std::endl;
+
+  if (unmounting)
+    return -ENOTCONN;
+
   filepath path(relpath);
   InodeRef in;
   int r = path_walk(path, &in, perms);
@@ -7277,6 +7444,10 @@ int Client::lutime(const char *relpath, struct utimbuf *buf,
   tout(cct) << relpath << std::endl;
   tout(cct) << buf->modtime << std::endl;
   tout(cct) << buf->actime << std::endl;
+
+  if (unmounting)
+    return -ENOTCONN;
+
   filepath path(relpath);
   InodeRef in;
   // don't follow symlinks
@@ -7298,6 +7469,10 @@ int Client::flock(int fd, int operation, uint64_t owner)
   tout(cct) << fd << std::endl;
   tout(cct) << operation << std::endl;
   tout(cct) << owner << std::endl;
+
+  if (unmounting)
+    return -ENOTCONN;
+
   Fh *f = get_filehandle(fd);
   if (!f)
     return -EBADF;
@@ -7310,6 +7485,10 @@ int Client::opendir(const char *relpath, dir_result_t **dirpp, const UserPerm& p
   Mutex::Locker lock(client_lock);
   tout(cct) << "opendir" << std::endl;
   tout(cct) << relpath << std::endl;
+
+  if (unmounting)
+    return -ENOTCONN;
+
   filepath path(relpath);
   InodeRef in;
   int r = path_walk(path, &in, perms, true);
@@ -7333,7 +7512,7 @@ int Client::_opendir(Inode *in, dir_result_t **dirpp, const UserPerm& perms)
     return -ENOTDIR;
   *dirpp = new dir_result_t(in, perms);
   opened_dirs.insert(*dirpp);
-  ldout(cct, 3) << "_opendir(" << in->ino << ") = " << 0 << " (" << *dirpp << ")" << dendl;
+  ldout(cct, 8) << "_opendir(" << in->ino << ") = " << 0 << " (" << *dirpp << ")" << dendl;
   return 0;
 }
 
@@ -7364,8 +7543,11 @@ void Client::_closedir(dir_result_t *dirp)
 void Client::rewinddir(dir_result_t *dirp)
 {
   Mutex::Locker lock(client_lock);
-
   ldout(cct, 3) << "rewinddir(" << dirp << ")" << dendl;
+
+  if (unmounting)
+    return;
+
   dir_result_t *d = static_cast<dir_result_t*>(dirp);
   _readdir_drop_dirp_buffer(d);
   d->reset();
@@ -7384,6 +7566,9 @@ void Client::seekdir(dir_result_t *dirp, loff_t offset)
 
   ldout(cct, 3) << "seekdir(" << dirp << ", " << offset << ")" << dendl;
 
+  if (unmounting)
+    return;
+
   if (offset == dirp->offset)
     return;
 
@@ -7511,7 +7696,7 @@ int Client::_readdir_get_frag(dir_result_t *dirp)
   req->head.args.readdir.frag = fg;
   req->head.args.readdir.flags = CEPH_READDIR_REPLY_BITFLAGS;
   if (dirp->last_name.length()) {
-    req->path2.set_path(dirp->last_name.c_str());
+    req->path2.set_path(dirp->last_name);
   } else if (dirp->hash_order()) {
     req->head.args.readdir.offset_hash = dirp->offset_high();
   }
@@ -7617,6 +7802,7 @@ int Client::_readdir_cache_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p,
     else
       dirp->next_offset = dirp->offset_low();
     dirp->last_name = dn_name; // we successfully returned this one; update!
+    dirp->release_count = 0; // last_name no longer match cache index
     if (r > 0)
       return r;
   }
@@ -7633,6 +7819,9 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p,
 
   Mutex::Locker lock(client_lock);
 
+  if (unmounting)
+    return -ENOTCONN;
+
   dir_result_t *dirp = static_cast<dir_result_t*>(d);
 
   ldout(cct, 10) << "readdir_r_cb " << *dirp->inode << " offset " << hex << dirp->offset
@@ -7685,10 +7874,10 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p,
     if (diri->dn_set.empty())
       in = diri;
     else
-      in = diri->get_first_parent()->inode;
+      in = diri->get_first_parent()->dir->parent_inode;
 
     int r;
-    r = _getattr(diri, caps, dirp->perms);
+    r = _getattr(in, caps, dirp->perms);
     if (r < 0)
       return r;
 
@@ -7720,7 +7909,7 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p,
           << dendl;
   if (dirp->inode->snapid != CEPH_SNAPDIR &&
       dirp->inode->is_complete_and_ordered() &&
-      dirp->inode->caps_issued_mask(CEPH_CAP_FILE_SHARED)) {
+      dirp->inode->caps_issued_mask(CEPH_CAP_FILE_SHARED, true)) {
     int err = _readdir_cache_cb(dirp, cb, p, caps, getref);
     if (err != -EAGAIN)
       return err;
@@ -8010,6 +8199,9 @@ int Client::open(const char *relpath, int flags, const UserPerm& perms,
   tout(cct) << relpath << std::endl;
   tout(cct) << ceph_flags_sys2wire(flags) << std::endl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   Fh *fh = NULL;
 
 #if defined(__linux__) && defined(O_PATH)
@@ -8094,6 +8286,9 @@ int Client::lookup_hash(inodeno_t ino, inodeno_t dirino, const char *name,
   Mutex::Locker lock(client_lock);
   ldout(cct, 3) << "lookup_hash enter(" << ino << ", #" << dirino << "/" << name << ")" << dendl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   MetaRequest *req = new MetaRequest(CEPH_MDS_OP_LOOKUPHASH);
   filepath path(ino);
   req->set_filepath(path);
@@ -8119,10 +8314,12 @@ int Client::lookup_hash(inodeno_t ino, inodeno_t dirino, const char *name,
  * the resulting Inode object in one operation, so that caller
  * can safely assume inode will still be there after return.
  */
-int Client::lookup_ino(inodeno_t ino, const UserPerm& perms, Inode **inode)
+int Client::_lookup_ino(inodeno_t ino, const UserPerm& perms, Inode **inode)
 {
-  Mutex::Locker lock(client_lock);
-  ldout(cct, 3) << "lookup_ino enter(" << ino << ")" << dendl;
+  ldout(cct, 8) << "lookup_ino enter(" << ino << ")" << dendl;
+
+  if (unmounting)
+    return -ENOTCONN;
 
   MetaRequest *req = new MetaRequest(CEPH_MDS_OP_LOOKUPINO);
   filepath path(ino);
@@ -8136,32 +8333,38 @@ int Client::lookup_ino(inodeno_t ino, const UserPerm& perms, Inode **inode)
     *inode = p->second;
     _ll_get(*inode);
   }
-  ldout(cct, 3) << "lookup_ino exit(" << ino << ") = " << r << dendl;
+  ldout(cct, 8) << "lookup_ino exit(" << ino << ") = " << r << dendl;
   return r;
 }
 
-
+int Client::lookup_ino(inodeno_t ino, const UserPerm& perms, Inode **inode)
+{
+  Mutex::Locker lock(client_lock);
+  return _lookup_ino(ino, perms, inode);
+}
 
 /**
  * Find the parent inode of `ino` and insert it into
  * our cache.  Conditionally also set `parent` to a referenced
  * Inode* if caller provides non-NULL value.
  */
-int Client::lookup_parent(Inode *ino, const UserPerm& perms, Inode **parent)
+int Client::_lookup_parent(Inode *ino, const UserPerm& perms, Inode **parent)
 {
-  Mutex::Locker lock(client_lock);
-  ldout(cct, 3) << "lookup_parent enter(" << ino->ino << ")" << dendl;
+  ldout(cct, 8) << "lookup_parent enter(" << ino->ino << ")" << dendl;
+
+  if (unmounting)
+    return -ENOTCONN;
 
   if (!ino->dn_set.empty()) {
     // if we exposed the parent here, we'd need to check permissions,
     // but right now we just rely on the MDS doing so in make_request
-    ldout(cct, 3) << "lookup_parent dentry already present" << dendl;
+    ldout(cct, 8) << "lookup_parent dentry already present" << dendl;
     return 0;
   }
   
   if (ino->is_root()) {
     *parent = NULL;
-    ldout(cct, 3) << "ino is root, no parent" << dendl;
+    ldout(cct, 8) << "ino is root, no parent" << dendl;
     return -EINVAL;
   }
 
@@ -8176,27 +8379,33 @@ int Client::lookup_parent(Inode *ino, const UserPerm& perms, Inode **parent)
     if (r == 0) {
       *parent = target.get();
       _ll_get(*parent);
-      ldout(cct, 3) << "lookup_parent found parent " << (*parent)->ino << dendl;
+      ldout(cct, 8) << "lookup_parent found parent " << (*parent)->ino << dendl;
     } else {
       *parent = NULL;
     }
   }
-  ldout(cct, 3) << "lookup_parent exit(" << ino->ino << ") = " << r << dendl;
+  ldout(cct, 8) << "lookup_parent exit(" << ino->ino << ") = " << r << dendl;
   return r;
 }
 
+int Client::lookup_parent(Inode *ino, const UserPerm& perms, Inode **parent)
+{
+  Mutex::Locker lock(client_lock);
+  return _lookup_parent(ino, perms, parent);
+}
 
 /**
  * Populate the parent dentry for `ino`, provided it is
  * a child of `parent`.
  */
-int Client::lookup_name(Inode *ino, Inode *parent, const UserPerm& perms)
+int Client::_lookup_name(Inode *ino, Inode *parent, const UserPerm& perms)
 {
   assert(parent->is_dir());
-
-  Mutex::Locker lock(client_lock);
   ldout(cct, 3) << "lookup_name enter(" << ino->ino << ")" << dendl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   MetaRequest *req = new MetaRequest(CEPH_MDS_OP_LOOKUPNAME);
   req->set_filepath2(filepath(parent->ino));
   req->set_filepath(filepath(ino->ino));
@@ -8207,6 +8416,11 @@ int Client::lookup_name(Inode *ino, Inode *parent, const UserPerm& perms)
   return r;
 }
 
+int Client::lookup_name(Inode *ino, Inode *parent, const UserPerm& perms)
+{
+  Mutex::Locker lock(client_lock);
+  return _lookup_name(ino, parent, perms);
+}
 
  Fh *Client::_create_fh(Inode *in, int flags, int cmode, const UserPerm& perms)
 {
@@ -8250,7 +8464,9 @@ int Client::_release_fh(Fh *f)
   //ldout(cct, 3) << "op: client->close(open_files[ " << fh << " ]);" << dendl;
   //ldout(cct, 3) << "op: open_files.erase( " << fh << " );" << dendl;
   Inode *in = f->inode.get();
-  ldout(cct, 5) << "_release_fh " << f << " mode " << f->mode << " on " << *in << dendl;
+  ldout(cct, 8) << "_release_fh " << f << " mode " << f->mode << " on " << *in << dendl;
+
+  in->unset_deleg(f);
 
   if (in->snapid == CEPH_NOSNAP) {
     if (in->put_open_ref(f->mode)) {
@@ -8303,11 +8519,11 @@ int Client::_open(Inode *in, int flags, mode_t mode, Fh **fhp,
 
   in->get_open_ref(cmode);  // make note of pending open, since it effects _wanted_ caps.
 
-  if ((flags & O_TRUNC) == 0 &&
-      in->caps_issued_mask(want)) {
+  if ((flags & O_TRUNC) == 0 && in->caps_issued_mask(want)) {
     // update wanted?
     check_caps(in, CHECK_CAPS_NODELAY);
   } else {
+
     MetaRequest *req = new MetaRequest(CEPH_MDS_OP_OPEN);
     filepath path;
     in->make_nosnap_relative_path(path);
@@ -8322,6 +8538,34 @@ int Client::_open(Inode *in, int flags, mode_t mode, Fh **fhp,
     req->head.args.open.old_size = in->size;   // for O_TRUNC
     req->set_inode(in);
     result = make_request(req, perms);
+
+    /*
+     * NFS expects that delegations will be broken on a conflicting open,
+     * not just when there is actual conflicting access to the file. SMB leases
+     * and oplocks also have similar semantics.
+     *
+     * Ensure that clients that have delegations enabled will wait on minimal
+     * caps during open, just to ensure that other clients holding delegations
+     * return theirs first.
+     */
+    if (deleg_timeout && result == 0) {
+      int need = 0, have;
+
+      if (cmode & CEPH_FILE_MODE_WR)
+        need |= CEPH_CAP_FILE_WR;
+      if (cmode & CEPH_FILE_MODE_RD)
+        need |= CEPH_CAP_FILE_RD;
+
+      result = get_caps(in, need, want, &have, -1);
+      if (result < 0) {
+       ldout(cct, 8) << "Unable to get caps after open of inode " << *in <<
+                         " . Denying open: " <<
+                         cpp_strerror(result) << dendl;
+       in->put_open_ref(cmode);
+      } else {
+       put_cap_ref(in, need);
+      }
+    }
   }
 
   // success?
@@ -8382,6 +8626,9 @@ int Client::close(int fd)
   tout(cct) << "close" << std::endl;
   tout(cct) << fd << std::endl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   Fh *fh = get_filehandle(fd);
   if (!fh)
     return -EBADF;
@@ -8404,6 +8651,9 @@ loff_t Client::lseek(int fd, loff_t offset, int whence)
   tout(cct) << offset << std::endl;
   tout(cct) << whence << std::endl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   Fh *f = get_filehandle(fd);
   if (!f)
     return -EBADF;
@@ -8439,7 +8689,7 @@ loff_t Client::_lseek(Fh *f, loff_t offset, int whence)
     ceph_abort();
   }
 
-  ldout(cct, 3) << "_lseek(" << f << ", " << offset << ", " << whence << ") = " << f->pos << dendl;
+  ldout(cct, 8) << "_lseek(" << f << ", " << offset << ", " << whence << ") = " << f->pos << dendl;
   return f->pos;
 }
 
@@ -8525,6 +8775,9 @@ int Client::read(int fd, char *buf, loff_t size, loff_t offset)
   tout(cct) << size << std::endl;
   tout(cct) << offset << std::endl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   Fh *f = get_filehandle(fd);
   if (!f)
     return -EBADF;
@@ -8677,7 +8930,7 @@ done:
     if (uninline_ret >= 0 || uninline_ret == -ECANCELED) {
       in->inline_data.clear();
       in->inline_version = CEPH_INLINE_NONE;
-      mark_caps_dirty(in, CEPH_CAP_FILE_WR);
+      in->mark_caps_dirty(CEPH_CAP_FILE_WR);
       check_caps(in, 0);
     } else
       r = uninline_ret;
@@ -8868,6 +9121,9 @@ int Client::write(int fd, const char *buf, loff_t size, loff_t offset)
   tout(cct) << size << std::endl;
   tout(cct) << offset << std::endl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   Fh *fh = get_filehandle(fd);
   if (!fh)
     return -EBADF;
@@ -8893,6 +9149,9 @@ int Client::_preadv_pwritev(int fd, const struct iovec *iov, unsigned iovcnt, in
     tout(cct) << fd << std::endl;
     tout(cct) << offset << std::endl;
 
+    if (unmounting)
+     return -ENOTCONN;
+
     Fh *fh = get_filehandle(fd);
     if (!fh)
         return -EBADF;
@@ -8955,8 +9214,9 @@ int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf,
 
   // check quota
   uint64_t endoff = offset + size;
-  if (endoff > in->size && is_quota_bytes_exceeded(in, endoff - in->size,
-                                                  f->actor_perms)) {
+  std::list<InodeRef> quota_roots;
+  if (endoff > in->size &&
+      is_quota_bytes_exceeded(in, endoff - in->size, f->actor_perms, &quota_roots)) {
     return -EDQUOT;
   }
 
@@ -9015,8 +9275,7 @@ int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf,
     return r;
 
   /* clear the setuid/setgid bits, if any */
-  if (unlikely((in->mode & S_ISUID) ||
-              (in->mode & (S_ISGID | S_IXGRP)) == (S_ISGID | S_IXGRP))) {
+  if (unlikely(in->mode & (S_ISUID|S_ISGID)) && size > 0) {
     struct ceph_statx stx = { 0 };
 
     put_cap_ref(in, CEPH_CAP_AUTH_SHARED);
@@ -9132,9 +9391,9 @@ success:
   // extend file?
   if (totalwritten + offset > in->size) {
     in->size = totalwritten + offset;
-    mark_caps_dirty(in, CEPH_CAP_FILE_WR);
+    in->mark_caps_dirty(CEPH_CAP_FILE_WR);
 
-    if (is_quota_bytes_approaching(in, f->actor_perms)) {
+    if (is_quota_bytes_approaching(in, quota_roots)) {
       check_caps(in, CHECK_CAPS_NODELAY);
     } else if (is_max_size_approaching(in)) {
       check_caps(in, 0);
@@ -9146,9 +9405,9 @@ success:
   }
 
   // mtime
-  in->mtime = ceph_clock_now();
+  in->mtime = in->ctime = ceph_clock_now();
   in->change_attr++;
-  mark_caps_dirty(in, CEPH_CAP_FILE_WR);
+  in->mark_caps_dirty(CEPH_CAP_FILE_WR);
 
 done:
 
@@ -9163,7 +9422,7 @@ done:
     if (uninline_ret >= 0 || uninline_ret == -ECANCELED) {
       in->inline_data.clear();
       in->inline_version = CEPH_INLINE_NONE;
-      mark_caps_dirty(in, CEPH_CAP_FILE_WR);
+      in->mark_caps_dirty(CEPH_CAP_FILE_WR);
       check_caps(in, 0);
     } else
       r = uninline_ret;
@@ -9201,6 +9460,9 @@ int Client::ftruncate(int fd, loff_t length, const UserPerm& perms)
   tout(cct) << fd << std::endl;
   tout(cct) << length << std::endl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   Fh *f = get_filehandle(fd);
   if (!f)
     return -EBADF;
@@ -9220,6 +9482,9 @@ int Client::fsync(int fd, bool syncdataonly)
   tout(cct) << fd << std::endl;
   tout(cct) << syncdataonly << std::endl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   Fh *f = get_filehandle(fd);
   if (!f)
     return -EBADF;
@@ -9232,14 +9497,14 @@ int Client::fsync(int fd, bool syncdataonly)
     // The IOs in this fsync were okay, but maybe something happened
     // in the background that we shoudl be reporting?
     r = f->take_async_err();
-    ldout(cct, 3) << "fsync(" << fd << ", " << syncdataonly
+    ldout(cct, 5) << "fsync(" << fd << ", " << syncdataonly
                   << ") = 0, async_err = " << r << dendl;
   } else {
     // Assume that an error we encountered during fsync, even reported
     // synchronously, would also have applied the error to the Fh, and we
     // should clear it here to avoid returning the same error again on next
     // call.
-    ldout(cct, 3) << "fsync(" << fd << ", " << syncdataonly << ") = "
+    ldout(cct, 5) << "fsync(" << fd << ", " << syncdataonly << ") = "
                   << r << dendl;
     f->take_async_err();
   }
@@ -9256,7 +9521,7 @@ int Client::_fsync(Inode *in, bool syncdataonly)
   ceph_tid_t flush_tid = 0;
   InodeRef tmp_ref;
 
-  ldout(cct, 3) << "_fsync on " << *in << " " << (syncdataonly ? "(dataonly)":"(data+metadata)") << dendl;
+  ldout(cct, 8) << "_fsync on " << *in << " " << (syncdataonly ? "(dataonly)":"(data+metadata)") << dendl;
   
   if (cct->_conf->client_oc) {
     object_cacher_completion = new C_SafeCond(&lock, &cond, &done, &r);
@@ -9272,6 +9537,8 @@ int Client::_fsync(Inode *in, bool syncdataonly)
   } else ldout(cct, 10) << "no metadata needs to commit" << dendl;
 
   if (!syncdataonly && !in->unsafe_ops.empty()) {
+    flush_mdlog_sync();
+
     MetaRequest *req = in->unsafe_ops.back();
     ldout(cct, 15) << "waiting on unsafe requests, last tid " << req->get_tid() <<  dendl;
 
@@ -9304,7 +9571,7 @@ int Client::_fsync(Inode *in, bool syncdataonly)
 
     ldout(cct, 10) << "ino " << in->ino << " has no uncommitted writes" << dendl;
   } else {
-    ldout(cct, 1) << "ino " << in->ino << " failed to commit to disk! "
+    ldout(cct, 8) << "ino " << in->ino << " failed to commit to disk! "
                  << cpp_strerror(-r) << dendl;
   }
 
@@ -9313,7 +9580,7 @@ int Client::_fsync(Inode *in, bool syncdataonly)
 
 int Client::_fsync(Fh *f, bool syncdataonly)
 {
-  ldout(cct, 3) << "_fsync(" << f << ", " << (syncdataonly ? "dataonly)":"data+metadata)") << dendl;
+  ldout(cct, 8) << "_fsync(" << f << ", " << (syncdataonly ? "dataonly)":"data+metadata)") << dendl;
   return _fsync(f->inode.get(), syncdataonly);
 }
 
@@ -9323,6 +9590,9 @@ int Client::fstat(int fd, struct stat *stbuf, const UserPerm& perms, int mask)
   tout(cct) << "fstat mask " << hex << mask << dec << std::endl;
   tout(cct) << fd << std::endl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   Fh *f = get_filehandle(fd);
   if (!f)
     return -EBADF;
@@ -9330,7 +9600,7 @@ int Client::fstat(int fd, struct stat *stbuf, const UserPerm& perms, int mask)
   if (r < 0)
     return r;
   fill_stat(f->inode, stbuf, NULL);
-  ldout(cct, 3) << "fstat(" << fd << ", " << stbuf << ") = " << r << dendl;
+  ldout(cct, 5) << "fstat(" << fd << ", " << stbuf << ") = " << r << dendl;
   return r;
 }
 
@@ -9341,6 +9611,9 @@ int Client::fstatx(int fd, struct ceph_statx *stx, const UserPerm& perms,
   tout(cct) << "fstatx flags " << hex << flags << " want " << want << dec << std::endl;
   tout(cct) << fd << std::endl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   Fh *f = get_filehandle(fd);
   if (!f)
     return -EBADF;
@@ -9348,7 +9621,7 @@ int Client::fstatx(int fd, struct ceph_statx *stx, const UserPerm& perms,
   unsigned mask = statx_to_mask(flags, want);
 
   int r = 0;
-  if (mask && !f->inode->caps_issued_mask(mask)) {
+  if (mask && !f->inode->caps_issued_mask(mask, true)) {
     r = _getattr(f->inode, mask, perms);
     if (r < 0) {
       ldout(cct, 3) << "fstatx exit on error!" << dendl;
@@ -9369,6 +9642,10 @@ int Client::chdir(const char *relpath, std::string &new_cwd,
   Mutex::Locker lock(client_lock);
   tout(cct) << "chdir" << std::endl;
   tout(cct) << relpath << std::endl;
+
+  if (unmounting)
+    return -ENOTCONN;
+
   filepath path(relpath);
   InodeRef in;
   int r = path_walk(path, &in, perms);
@@ -9378,11 +9655,11 @@ int Client::chdir(const char *relpath, std::string &new_cwd,
     cwd.swap(in);
   ldout(cct, 3) << "chdir(" << relpath << ")  cwd now " << cwd->ino << dendl;
 
-  getcwd(new_cwd, perms);
+  _getcwd(new_cwd, perms);
   return 0;
 }
 
-void Client::getcwd(string& dir, const UserPerm& perms)
+void Client::_getcwd(string& dir, const UserPerm& perms)
 {
   filepath path;
   ldout(cct, 10) << "getcwd " << *cwd << dendl;
@@ -9422,18 +9699,37 @@ void Client::getcwd(string& dir, const UserPerm& perms)
   dir += path.get_path();
 }
 
+void Client::getcwd(string& dir, const UserPerm& perms)
+{
+  Mutex::Locker l(client_lock);
+  if (!unmounting)
+    _getcwd(dir, perms);
+}
+
 int Client::statfs(const char *path, struct statvfs *stbuf,
                   const UserPerm& perms)
 {
   Mutex::Locker l(client_lock);
   tout(cct) << "statfs" << std::endl;
+  unsigned long int total_files_on_fs;
+
+  if (unmounting)
+    return -ENOTCONN;
 
   ceph_statfs stats;
   C_SaferCond cond;
-  objecter->get_fs_stats(stats, &cond);
+
+  const vector<int64_t> &data_pools = mdsmap->get_data_pools();
+  if (data_pools.size() == 1) {
+    objecter->get_fs_stats(stats, data_pools[0], &cond);
+  } else {
+    objecter->get_fs_stats(stats, boost::optional<int64_t>(), &cond);
+  }
 
   client_lock.Unlock();
   int rval = cond.wait();
+  assert(root);
+  total_files_on_fs = root->rstat.rfiles + root->rstat.rsubdirs;
   client_lock.Lock();
 
   if (rval < 0) {
@@ -9455,8 +9751,8 @@ int Client::statfs(const char *path, struct statvfs *stbuf,
   const int CEPH_BLOCK_SHIFT = 22;
   stbuf->f_frsize = 1 << CEPH_BLOCK_SHIFT;
   stbuf->f_bsize = 1 << CEPH_BLOCK_SHIFT;
-  stbuf->f_files = stats.num_objects;
-  stbuf->f_ffree = -1;
+  stbuf->f_files = total_files_on_fs;
+  stbuf->f_ffree = 0;
   stbuf->f_favail = -1;
   stbuf->f_fsid = -1;       // ??
   stbuf->f_flag = 0;        // ??
@@ -9501,7 +9797,7 @@ int Client::statfs(const char *path, struct statvfs *stbuf,
     stbuf->f_bfree = free;
     stbuf->f_bavail = free;
   } else {
-    // General case: report the overall RADOS cluster's statistics.  Because
+    // General case: report the cluster statistics returned from RADOS. Because
     // multiple pools may be used without one filesystem namespace via
     // layouts, this is the most correct thing we can do.
     stbuf->f_blocks = stats.kb >> (CEPH_BLOCK_SHIFT - 10);
@@ -9826,7 +10122,6 @@ void Client::ll_register_callbacks(struct client_callback_args *args)
   ldout(cct, 10) << "ll_register_callbacks cb " << args->handle
                 << " invalidate_ino_cb " << args->ino_cb
                 << " invalidate_dentry_cb " << args->dentry_cb
-                << " getgroups_cb" << args->getgroups_cb
                 << " switch_interrupt_cb " << args->switch_intr_cb
                 << " remount_cb " << args->remount_cb
                 << dendl;
@@ -9847,7 +10142,6 @@ void Client::ll_register_callbacks(struct client_callback_args *args)
     remount_cb = args->remount_cb;
     remount_finisher.start();
   }
-  getgroups_cb = args->getgroups_cb;
   umask_cb = args->umask_cb;
 }
 
@@ -9860,21 +10154,19 @@ int Client::test_dentry_handling(bool can_invalidate)
   if (can_invalidate_dentries) {
     assert(dentry_invalidate_cb);
     ldout(cct, 1) << "using dentry_invalidate_cb" << dendl;
+    r = 0;
   } else if (remount_cb) {
     ldout(cct, 1) << "using remount_cb" << dendl;
-    int s = remount_cb(callback_handle);
-    if (s) {
-      lderr(cct) << "Failed to invoke remount, needed to ensure kernel dcache consistency"
-                << dendl;
-    }
-    if (cct->_conf->client_die_on_failed_remount) {
-      require_remount = true;
-      r = s;
-    }
-  } else {
-    lderr(cct) << "no method to invalidate kernel dentry cache; expect issues!" << dendl;
-    if (cct->_conf->client_die_on_failed_remount)
+    r = _do_remount(false);
+  }
+  if (r) {
+    bool should_abort = cct->_conf->get_val<bool>("client_die_on_failed_dentry_invalidate");
+    if (should_abort) {
+      lderr(cct) << "no method to invalidate kernel dentry cache; quitting!" << dendl;
       ceph_abort();
+    } else {
+      lderr(cct) << "no method to invalidate kernel dentry cache; expect issues!" << dendl;
+    }
   }
   return r;
 }
@@ -9917,6 +10209,10 @@ int Client::_sync_fs()
 int Client::sync_fs()
 {
   Mutex::Locker l(client_lock);
+
+  if (unmounting)
+    return -ENOTCONN;
+
   return _sync_fs();
 }
 
@@ -9967,6 +10263,10 @@ int Client::lazyio_synchronize(int fd, loff_t offset, size_t count)
 int Client::mksnap(const char *relpath, const char *name, const UserPerm& perm)
 {
   Mutex::Locker l(client_lock);
+
+  if (unmounting)
+    return -ENOTCONN;
+
   filepath path(relpath);
   InodeRef in;
   int r = path_walk(path, &in, perm);
@@ -9980,9 +10280,14 @@ int Client::mksnap(const char *relpath, const char *name, const UserPerm& perm)
   Inode *snapdir = open_snapdir(in.get());
   return _mkdir(snapdir, name, 0, perm);
 }
+
 int Client::rmsnap(const char *relpath, const char *name, const UserPerm& perms)
 {
   Mutex::Locker l(client_lock);
+
+  if (unmounting)
+    return -ENOTCONN;
+
   filepath path(relpath);
   InodeRef in;
   int r = path_walk(path, &in, perms);
@@ -10004,6 +10309,9 @@ int Client::get_caps_issued(int fd) {
 
   Mutex::Locker lock(client_lock);
 
+  if (unmounting)
+    return -ENOTCONN;
+
   Fh *f = get_filehandle(fd);
   if (!f)
     return -EBADF;
@@ -10014,6 +10322,10 @@ int Client::get_caps_issued(int fd) {
 int Client::get_caps_issued(const char *path, const UserPerm& perms)
 {
   Mutex::Locker lock(client_lock);
+
+  if (unmounting)
+    return -ENOTCONN;
+
   filepath p(path);
   InodeRef in;
   int r = path_walk(p, &in, perms, true);
@@ -10066,6 +10378,9 @@ int Client::ll_lookup(Inode *parent, const char *name, struct stat *attr,
   tout(cct) << "ll_lookup" << std::endl;
   tout(cct) << name << std::endl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   int r = 0;
   if (!cct->_conf->fuse_default_permissions) {
     r = may_lookup(parent, perms);
@@ -10094,6 +10409,51 @@ int Client::ll_lookup(Inode *parent, const char *name, struct stat *attr,
   return r;
 }
 
+int Client::ll_lookup_inode(
+    struct inodeno_t ino,
+    const UserPerm& perms,
+    Inode **inode)
+{
+  Mutex::Locker lock(client_lock);
+  ldout(cct, 3) << "ll_lookup_inode " << ino  << dendl;
+   
+  // Num1: get inode and *inode
+  int r = _lookup_ino(ino, perms, inode);
+  if (r) {
+    return r;
+  }
+  assert(inode != NULL);
+  assert(*inode != NULL);
+
+  // Num2: Request the parent inode, so that we can look up the name
+  Inode *parent;
+  r = _lookup_parent(*inode, perms, &parent);
+  if (r && r != -EINVAL) {
+    // Unexpected error
+    _ll_forget(*inode, 1);  
+    return r;
+  } else if (r == -EINVAL) {
+    // EINVAL indicates node without parents (root), drop out now
+    // and don't try to look up the non-existent dentry.
+    return 0;
+  }
+  // FIXME: I don't think this works; lookup_parent() returns 0 if the parent
+  // is already in cache
+  assert(parent != NULL);
+
+  // Num3: Finally, get the name (dentry) of the requested inode
+  r = _lookup_name(*inode, parent, perms);
+  if (r) {
+    // Unexpected error
+    _ll_forget(parent, 1);
+    _ll_forget(*inode, 1);
+    return r;
+  }
+
+  _ll_forget(parent, 1);
+  return 0;
+}
+
 int Client::ll_lookupx(Inode *parent, const char *name, Inode **out,
                       struct ceph_statx *stx, unsigned want, unsigned flags,
                       const UserPerm& perms)
@@ -10104,6 +10464,9 @@ int Client::ll_lookupx(Inode *parent, const char *name, Inode **out,
   tout(cct) << "ll_lookupx" << std::endl;
   tout(cct) << name << std::endl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   int r = 0;
   if (!cct->_conf->fuse_default_permissions) {
     r = may_lookup(parent, perms);
@@ -10136,6 +10499,10 @@ int Client::ll_walk(const char* name, Inode **out, struct ceph_statx *stx,
                    unsigned int want, unsigned int flags, const UserPerm& perms)
 {
   Mutex::Locker lock(client_lock);
+
+  if (unmounting)
+    return -ENOTCONN;
+
   filepath fp(name, 0);
   InodeRef in;
   int rc;
@@ -10193,6 +10560,7 @@ int Client::_ll_put(Inode *in, int num)
 void Client::_ll_drop_pins()
 {
   ldout(cct, 10) << "_ll_drop_pins" << dendl;
+  std::set<InodeRef> to_be_put; //this set will be deconstructed item by item when exit
   ceph::unordered_map<vinodeno_t, Inode*>::iterator next;
   for (ceph::unordered_map<vinodeno_t, Inode*>::iterator it = inode_map.begin();
        it != inode_map.end();
@@ -10200,21 +10568,26 @@ void Client::_ll_drop_pins()
     Inode *in = it->second;
     next = it;
     ++next;
-    if (in->ll_ref)
+    if (in->ll_ref){
+      to_be_put.insert(in);
       _ll_put(in, in->ll_ref);
+    }
   }
 }
 
-bool Client::ll_forget(Inode *in, int count)
+bool Client::_ll_forget(Inode *in, int count)
 {
-  Mutex::Locker lock(client_lock);
   inodeno_t ino = _get_inodeno(in);
 
-  ldout(cct, 3) << "ll_forget " << ino << " " << count << dendl;
+  ldout(cct, 8) << "ll_forget " << ino << " " << count << dendl;
   tout(cct) << "ll_forget" << std::endl;
   tout(cct) << ino.val << std::endl;
   tout(cct) << count << std::endl;
 
+  // Ignore forget if we're no longer mounted
+  if (unmounting)
+    return true;
+
   if (ino == 1) return true;  // ignore forget on root.
 
   bool last = false;
@@ -10231,6 +10604,12 @@ bool Client::ll_forget(Inode *in, int count)
   return last;
 }
 
+bool Client::ll_forget(Inode *in, int count)
+{
+  Mutex::Locker lock(client_lock);
+  return _ll_forget(in, count);
+}
+
 bool Client::ll_put(Inode *in)
 {
   /* ll_forget already takes the lock */
@@ -10246,6 +10625,10 @@ snapid_t Client::ll_get_snapid(Inode *in)
 Inode *Client::ll_get_inode(ino_t ino)
 {
   Mutex::Locker lock(client_lock);
+
+  if (unmounting)
+    return NULL;
+
   vinodeno_t vino = _map_faked_ino(ino);
   unordered_map<vinodeno_t,Inode*>::iterator p = inode_map.find(vino);
   if (p == inode_map.end())
@@ -10258,6 +10641,10 @@ Inode *Client::ll_get_inode(ino_t ino)
 Inode *Client::ll_get_inode(vinodeno_t vino)
 {
   Mutex::Locker lock(client_lock);
+
+  if (unmounting)
+    return NULL;
+
   unordered_map<vinodeno_t,Inode*>::iterator p = inode_map.find(vino);
   if (p == inode_map.end())
     return NULL;
@@ -10270,7 +10657,7 @@ int Client::_ll_getattr(Inode *in, int caps, const UserPerm& perms)
 {
   vinodeno_t vino = _get_vino(in);
 
-  ldout(cct, 3) << "ll_getattr " << vino << dendl;
+  ldout(cct, 8) << "ll_getattr " << vino << dendl;
   tout(cct) << "ll_getattr" << std::endl;
   tout(cct) << vino.ino.val << std::endl;
 
@@ -10284,6 +10671,9 @@ int Client::ll_getattr(Inode *in, struct stat *attr, const UserPerm& perms)
 {
   Mutex::Locker lock(client_lock);
 
+  if (unmounting)
+    return -ENOTCONN;
+
   int res = _ll_getattr(in, CEPH_STAT_CAP_INODE_ALL, perms);
 
   if (res == 0)
@@ -10297,10 +10687,13 @@ int Client::ll_getattrx(Inode *in, struct ceph_statx *stx, unsigned int want,
 {
   Mutex::Locker lock(client_lock);
 
+  if (unmounting)
+    return -ENOTCONN;
+
   int res = 0;
   unsigned mask = statx_to_mask(flags, want);
 
-  if (mask && !in->caps_issued_mask(mask))
+  if (mask && !in->caps_issued_mask(mask, true))
     res = _ll_getattr(in, mask, perms);
 
   if (res == 0)
@@ -10314,7 +10707,7 @@ int Client::_ll_setattrx(Inode *in, struct ceph_statx *stx, int mask,
 {
   vinodeno_t vino = _get_vino(in);
 
-  ldout(cct, 3) << "ll_setattrx " << vino << " mask " << hex << mask << dec
+  ldout(cct, 8) << "ll_setattrx " << vino << " mask " << hex << mask << dec
                << dendl;
   tout(cct) << "ll_setattrx" << std::endl;
   tout(cct) << vino.ino.val << std::endl;
@@ -10342,6 +10735,10 @@ int Client::ll_setattrx(Inode *in, struct ceph_statx *stx, int mask,
                        const UserPerm& perms)
 {
   Mutex::Locker lock(client_lock);
+
+  if (unmounting)
+    return -ENOTCONN;
+
   InodeRef target(in);
   int res = _ll_setattrx(in, stx, mask, perms, &target);
   if (res == 0) {
@@ -10360,6 +10757,10 @@ int Client::ll_setattr(Inode *in, struct stat *attr, int mask,
   stat_to_statx(attr, &stx);
 
   Mutex::Locker lock(client_lock);
+
+  if (unmounting)
+    return -ENOTCONN;
+
   InodeRef target(in);
   int res = _ll_setattrx(in, &stx, mask, perms, &target);
   if (res == 0) {
@@ -10379,6 +10780,10 @@ int Client::getxattr(const char *path, const char *name, void *value, size_t siz
                     const UserPerm& perms)
 {
   Mutex::Locker lock(client_lock);
+
+  if (unmounting)
+    return -ENOTCONN;
+
   InodeRef in;
   int r = Client::path_walk(path, &in, perms, true, CEPH_STAT_CAP_XATTR);
   if (r < 0)
@@ -10390,6 +10795,10 @@ int Client::lgetxattr(const char *path, const char *name, void *value, size_t si
                      const UserPerm& perms)
 {
   Mutex::Locker lock(client_lock);
+
+  if (unmounting)
+    return -ENOTCONN;
+
   InodeRef in;
   int r = Client::path_walk(path, &in, perms, false, CEPH_STAT_CAP_XATTR);
   if (r < 0)
@@ -10401,6 +10810,10 @@ int Client::fgetxattr(int fd, const char *name, void *value, size_t size,
                      const UserPerm& perms)
 {
   Mutex::Locker lock(client_lock);
+
+  if (unmounting)
+    return -ENOTCONN;
+
   Fh *f = get_filehandle(fd);
   if (!f)
     return -EBADF;
@@ -10411,6 +10824,10 @@ int Client::listxattr(const char *path, char *list, size_t size,
                      const UserPerm& perms)
 {
   Mutex::Locker lock(client_lock);
+
+  if (unmounting)
+    return -ENOTCONN;
+
   InodeRef in;
   int r = Client::path_walk(path, &in, perms, true, CEPH_STAT_CAP_XATTR);
   if (r < 0)
@@ -10422,6 +10839,10 @@ int Client::llistxattr(const char *path, char *list, size_t size,
                       const UserPerm& perms)
 {
   Mutex::Locker lock(client_lock);
+
+  if (unmounting)
+    return -ENOTCONN;
+
   InodeRef in;
   int r = Client::path_walk(path, &in, perms, false, CEPH_STAT_CAP_XATTR);
   if (r < 0)
@@ -10432,6 +10853,10 @@ int Client::llistxattr(const char *path, char *list, size_t size,
 int Client::flistxattr(int fd, char *list, size_t size, const UserPerm& perms)
 {
   Mutex::Locker lock(client_lock);
+
+  if (unmounting)
+    return -ENOTCONN;
+
   Fh *f = get_filehandle(fd);
   if (!f)
     return -EBADF;
@@ -10442,6 +10867,10 @@ int Client::removexattr(const char *path, const char *name,
                        const UserPerm& perms)
 {
   Mutex::Locker lock(client_lock);
+
+  if (unmounting)
+    return -ENOTCONN;
+
   InodeRef in;
   int r = Client::path_walk(path, &in, perms, true);
   if (r < 0)
@@ -10453,6 +10882,10 @@ int Client::lremovexattr(const char *path, const char *name,
                         const UserPerm& perms)
 {
   Mutex::Locker lock(client_lock);
+
+  if (unmounting)
+    return -ENOTCONN;
+
   InodeRef in;
   int r = Client::path_walk(path, &in, perms, false);
   if (r < 0)
@@ -10463,6 +10896,10 @@ int Client::lremovexattr(const char *path, const char *name,
 int Client::fremovexattr(int fd, const char *name, const UserPerm& perms)
 {
   Mutex::Locker lock(client_lock);
+
+  if (unmounting)
+    return -ENOTCONN;
+
   Fh *f = get_filehandle(fd);
   if (!f)
     return -EBADF;
@@ -10475,6 +10912,10 @@ int Client::setxattr(const char *path, const char *name, const void *value,
   _setxattr_maybe_wait_for_osdmap(name, value, size);
 
   Mutex::Locker lock(client_lock);
+
+  if (unmounting)
+    return -ENOTCONN;
+
   InodeRef in;
   int r = Client::path_walk(path, &in, perms, true);
   if (r < 0)
@@ -10488,6 +10929,10 @@ int Client::lsetxattr(const char *path, const char *name, const void *value,
   _setxattr_maybe_wait_for_osdmap(name, value, size);
 
   Mutex::Locker lock(client_lock);
+
+  if (unmounting)
+    return -ENOTCONN;
+
   InodeRef in;
   int r = Client::path_walk(path, &in, perms, false);
   if (r < 0)
@@ -10501,6 +10946,10 @@ int Client::fsetxattr(int fd, const char *name, const void *value, size_t size,
   _setxattr_maybe_wait_for_osdmap(name, value, size);
 
   Mutex::Locker lock(client_lock);
+
+  if (unmounting)
+    return -ENOTCONN;
+
   Fh *f = get_filehandle(fd);
   if (!f)
     return -EBADF;
@@ -10518,7 +10967,11 @@ int Client::_getxattr(Inode *in, const char *name, void *value, size_t size,
 
     // Do a force getattr to get the latest quota before returning
     // a value to userspace.
-    r = _getattr(in, 0, perms, true);
+    int flags = 0;
+    if (vxattr->flags & VXATTR_RSTAT) {
+      flags |= CEPH_STAT_RSTAT;
+    }
+    r = _getattr(in, flags, perms, true);
     if (r != 0) {
       // Error from getattr!
       return r;
@@ -10562,7 +11015,7 @@ int Client::_getxattr(Inode *in, const char *name, void *value, size_t size,
     }
   }
  out:
-  ldout(cct, 3) << "_getxattr(" << in->ino << ", \"" << name << "\", " << size << ") = " << r << dendl;
+  ldout(cct, 8) << "_getxattr(" << in->ino << ", \"" << name << "\", " << size << ") = " << r << dendl;
   return r;
 }
 
@@ -10582,6 +11035,9 @@ int Client::ll_getxattr(Inode *in, const char *name, void *value,
 {
   Mutex::Locker lock(client_lock);
 
+  if (unmounting)
+    return -ENOTCONN;
+
   vinodeno_t vino = _get_vino(in);
 
   ldout(cct, 3) << "ll_getxattr " << vino << " " << name << " size " << size << dendl;
@@ -10639,7 +11095,7 @@ int Client::_listxattr(Inode *in, char *name, size_t size,
        r = -ERANGE;
     }
   }
-  ldout(cct, 3) << "_listxattr(" << in->ino << ", " << size << ") = " << r << dendl;
+  ldout(cct, 8) << "_listxattr(" << in->ino << ", " << size << ") = " << r << dendl;
   return r;
 }
 
@@ -10648,6 +11104,9 @@ int Client::ll_listxattr(Inode *in, char *names, size_t size,
 {
   Mutex::Locker lock(client_lock);
 
+  if (unmounting)
+    return -ENOTCONN;
+
   vinodeno_t vino = _get_vino(in);
 
   ldout(cct, 3) << "ll_listxattr " << vino << " size " << size << dendl;
@@ -10830,6 +11289,9 @@ int Client::ll_setxattr(Inode *in, const char *name, const void *value,
 
   Mutex::Locker lock(client_lock);
 
+  if (unmounting)
+    return -ENOTCONN;
+
   vinodeno_t vino = _get_vino(in);
 
   ldout(cct, 3) << "ll_setxattr " << vino << " " << name << " size " << size << dendl;
@@ -10873,7 +11335,7 @@ int Client::_removexattr(Inode *in, const char *name, const UserPerm& perms)
   int res = make_request(req, perms);
 
   trim_cache();
-  ldout(cct, 3) << "_removexattr(" << in->ino << ", \"" << name << "\") = " << res << dendl;
+  ldout(cct, 8) << "_removexattr(" << in->ino << ", \"" << name << "\") = " << res << dendl;
   return res;
 }
 
@@ -10891,6 +11353,9 @@ int Client::ll_removexattr(Inode *in, const char *name, const UserPerm& perms)
 {
   Mutex::Locker lock(client_lock);
 
+  if (unmounting)
+    return -ENOTCONN;
+
   vinodeno_t vino = _get_vino(in);
 
   ldout(cct, 3) << "ll_removexattr " << vino << " " << name << dendl;
@@ -11023,6 +11488,16 @@ size_t Client::_vxattrcb_dir_rctime(Inode *in, char *val, size_t size)
   readonly: true,                                              \
   hidden: false,                                               \
   exists_cb: NULL,                                             \
+  flags: 0,                                                     \
+}
+#define XATTR_NAME_CEPH2(_type, _name, _flags)                 \
+{                                                              \
+  name: CEPH_XATTR_NAME(_type, _name),                         \
+  getxattr_cb: &Client::_vxattrcb_ ## _type ## _ ## _name,     \
+  readonly: true,                                              \
+  hidden: false,                                               \
+  exists_cb: NULL,                                             \
+  flags: _flags,                                               \
 }
 #define XATTR_LAYOUT_FIELD(_type, _name, _field)               \
 {                                                              \
@@ -11031,6 +11506,7 @@ size_t Client::_vxattrcb_dir_rctime(Inode *in, char *val, size_t size)
   readonly: false,                                             \
   hidden: true,                                                        \
   exists_cb: &Client::_vxattrcb_layout_exists,                 \
+  flags: 0,                                                     \
 }
 #define XATTR_QUOTA_FIELD(_type, _name)                                \
 {                                                              \
@@ -11039,6 +11515,7 @@ size_t Client::_vxattrcb_dir_rctime(Inode *in, char *val, size_t size)
   readonly: false,                                             \
   hidden: true,                                                        \
   exists_cb: &Client::_vxattrcb_quota_exists,                  \
+  flags: 0,                                                     \
 }
 
 const Client::VXattr Client::_dir_vxattrs[] = {
@@ -11048,6 +11525,7 @@ const Client::VXattr Client::_dir_vxattrs[] = {
     readonly: false,
     hidden: true,
     exists_cb: &Client::_vxattrcb_layout_exists,
+    flags: 0,
   },
   XATTR_LAYOUT_FIELD(dir, layout, stripe_unit),
   XATTR_LAYOUT_FIELD(dir, layout, stripe_count),
@@ -11057,17 +11535,18 @@ const Client::VXattr Client::_dir_vxattrs[] = {
   XATTR_NAME_CEPH(dir, entries),
   XATTR_NAME_CEPH(dir, files),
   XATTR_NAME_CEPH(dir, subdirs),
-  XATTR_NAME_CEPH(dir, rentries),
-  XATTR_NAME_CEPH(dir, rfiles),
-  XATTR_NAME_CEPH(dir, rsubdirs),
-  XATTR_NAME_CEPH(dir, rbytes),
-  XATTR_NAME_CEPH(dir, rctime),
+  XATTR_NAME_CEPH2(dir, rentries, VXATTR_RSTAT),
+  XATTR_NAME_CEPH2(dir, rfiles, VXATTR_RSTAT),
+  XATTR_NAME_CEPH2(dir, rsubdirs, VXATTR_RSTAT),
+  XATTR_NAME_CEPH2(dir, rbytes, VXATTR_RSTAT),
+  XATTR_NAME_CEPH2(dir, rctime, VXATTR_RSTAT),
   {
     name: "ceph.quota",
     getxattr_cb: &Client::_vxattrcb_quota,
     readonly: false,
     hidden: true,
     exists_cb: &Client::_vxattrcb_quota_exists,
+    flags: 0,
   },
   XATTR_QUOTA_FIELD(quota, max_bytes),
   XATTR_QUOTA_FIELD(quota, max_files),
@@ -11081,6 +11560,7 @@ const Client::VXattr Client::_file_vxattrs[] = {
     readonly: false,
     hidden: true,
     exists_cb: &Client::_vxattrcb_layout_exists,
+    flags: 0,
   },
   XATTR_LAYOUT_FIELD(file, layout, stripe_unit),
   XATTR_LAYOUT_FIELD(file, layout, stripe_count),
@@ -11129,6 +11609,9 @@ int Client::ll_readlink(Inode *in, char *buf, size_t buflen, const UserPerm& per
 {
   Mutex::Locker lock(client_lock);
 
+  if (unmounting)
+    return -ENOTCONN;
+
   vinodeno_t vino = _get_vino(in);
 
   ldout(cct, 3) << "ll_readlink " << vino << dendl;
@@ -11149,7 +11632,7 @@ int Client::ll_readlink(Inode *in, char *buf, size_t buflen, const UserPerm& per
 int Client::_mknod(Inode *dir, const char *name, mode_t mode, dev_t rdev,
                   const UserPerm& perms, InodeRef *inp)
 {
-  ldout(cct, 3) << "_mknod(" << dir->ino << " " << name << ", 0" << oct
+  ldout(cct, 8) << "_mknod(" << dir->ino << " " << name << ", 0" << oct
                << mode << dec << ", " << rdev << ", uid " << perms.uid()
                << ", gid " << perms.gid() << ")" << dendl;
 
@@ -11192,7 +11675,7 @@ int Client::_mknod(Inode *dir, const char *name, mode_t mode, dev_t rdev,
 
   trim_cache();
 
-  ldout(cct, 3) << "mknod(" << path << ", 0" << oct << mode << dec << ") = " << res << dendl;
+  ldout(cct, 8) << "mknod(" << path << ", 0" << oct << mode << dec << ") = " << res << dendl;
   return res;
 
  fail:
@@ -11206,6 +11689,9 @@ int Client::ll_mknod(Inode *parent, const char *name, mode_t mode,
 {
   Mutex::Locker lock(client_lock);
 
+  if (unmounting)
+    return -ENOTCONN;
+
   vinodeno_t vparent = _get_vino(parent);
 
   ldout(cct, 3) << "ll_mknod " << vparent << " " << name << dendl;
@@ -11242,6 +11728,9 @@ int Client::ll_mknodx(Inode *parent, const char *name, mode_t mode,
   unsigned caps = statx_to_mask(flags, want);
   Mutex::Locker lock(client_lock);
 
+  if (unmounting)
+    return -ENOTCONN;
+
   vinodeno_t vparent = _get_vino(parent);
 
   ldout(cct, 3) << "ll_mknodx " << vparent << " " << name << dendl;
@@ -11275,7 +11764,7 @@ int Client::_create(Inode *dir, const char *name, int flags, mode_t mode,
                    int object_size, const char *data_pool, bool *created,
                    const UserPerm& perms)
 {
-  ldout(cct, 3) << "_create(" << dir->ino << " " << name << ", 0" << oct <<
+  ldout(cct, 8) << "_create(" << dir->ino << " " << name << ", 0" << oct <<
     mode << dec << ")" << dendl;
 
   if (strlen(name) > NAME_MAX)
@@ -11351,7 +11840,7 @@ int Client::_create(Inode *dir, const char *name, int flags, mode_t mode,
  reply_error:
   trim_cache();
 
-  ldout(cct, 3) << "create(" << path << ", 0" << oct << mode << dec 
+  ldout(cct, 8) << "create(" << path << ", 0" << oct << mode << dec
                << " layout " << stripe_unit
                << ' ' << stripe_count
                << ' ' << object_size
@@ -11367,7 +11856,7 @@ int Client::_create(Inode *dir, const char *name, int flags, mode_t mode,
 int Client::_mkdir(Inode *dir, const char *name, mode_t mode, const UserPerm& perm,
                   InodeRef *inp)
 {
-  ldout(cct, 3) << "_mkdir(" << dir->ino << " " << name << ", 0" << oct
+  ldout(cct, 8) << "_mkdir(" << dir->ino << " " << name << ", 0" << oct
                << mode << dec << ", uid " << perm.uid()
                << ", gid " << perm.gid() << ")" << dendl;
 
@@ -11412,7 +11901,7 @@ int Client::_mkdir(Inode *dir, const char *name, mode_t mode, const UserPerm& pe
 
   trim_cache();
 
-  ldout(cct, 3) << "_mkdir(" << path << ", 0" << oct << mode << dec << ") = " << res << dendl;
+  ldout(cct, 8) << "_mkdir(" << path << ", 0" << oct << mode << dec << ") = " << res << dendl;
   return res;
 
  fail:
@@ -11425,6 +11914,9 @@ int Client::ll_mkdir(Inode *parent, const char *name, mode_t mode,
 {
   Mutex::Locker lock(client_lock);
 
+  if (unmounting)
+    return -ENOTCONN;
+
   vinodeno_t vparent = _get_vino(parent);
 
   ldout(cct, 3) << "ll_mkdir " << vparent << " " << name << dendl;
@@ -11458,6 +11950,9 @@ int Client::ll_mkdirx(Inode *parent, const char *name, mode_t mode, Inode **out,
 {
   Mutex::Locker lock(client_lock);
 
+  if (unmounting)
+    return -ENOTCONN;
+
   vinodeno_t vparent = _get_vino(parent);
 
   ldout(cct, 3) << "ll_mkdirx " << vparent << " " << name << dendl;
@@ -11491,7 +11986,7 @@ int Client::ll_mkdirx(Inode *parent, const char *name, mode_t mode, Inode **out,
 int Client::_symlink(Inode *dir, const char *name, const char *target,
                     const UserPerm& perms, InodeRef *inp)
 {
-  ldout(cct, 3) << "_symlink(" << dir->ino << " " << name << ", " << target
+  ldout(cct, 8) << "_symlink(" << dir->ino << " " << name << ", " << target
                << ", uid " << perms.uid() << ", gid " << perms.gid() << ")"
                << dendl;
 
@@ -11525,7 +12020,7 @@ int Client::_symlink(Inode *dir, const char *name, const char *target,
   res = make_request(req, perms, inp);
 
   trim_cache();
-  ldout(cct, 3) << "_symlink(\"" << path << "\", \"" << target << "\") = " <<
+  ldout(cct, 8) << "_symlink(\"" << path << "\", \"" << target << "\") = " <<
     res << dendl;
   return res;
 
@@ -11539,6 +12034,9 @@ int Client::ll_symlink(Inode *parent, const char *name, const char *value,
 {
   Mutex::Locker lock(client_lock);
 
+  if (unmounting)
+    return -ENOTCONN;
+
   vinodeno_t vparent = _get_vino(parent);
 
   ldout(cct, 3) << "ll_symlink " << vparent << " " << name << " -> " << value
@@ -11573,6 +12071,9 @@ int Client::ll_symlinkx(Inode *parent, const char *name, const char *value,
 {
   Mutex::Locker lock(client_lock);
 
+  if (unmounting)
+    return -ENOTCONN;
+
   vinodeno_t vparent = _get_vino(parent);
 
   ldout(cct, 3) << "ll_symlinkx " << vparent << " " << name << " -> " << value
@@ -11603,7 +12104,7 @@ int Client::ll_symlinkx(Inode *parent, const char *name, const char *value,
 
 int Client::_unlink(Inode *dir, const char *name, const UserPerm& perm)
 {
-  ldout(cct, 3) << "_unlink(" << dir->ino << " " << name
+  ldout(cct, 8) << "_unlink(" << dir->ino << " " << name
                << " uid " << perm.uid() << " gid " << perm.gid()
                << ")" << dendl;
 
@@ -11619,8 +12120,9 @@ int Client::_unlink(Inode *dir, const char *name, const UserPerm& perm)
   req->set_filepath(path);
 
   InodeRef otherin;
-
+  Inode *in;
   Dentry *de;
+
   int res = get_or_create(dir, name, &de);
   if (res < 0)
     goto fail;
@@ -11631,7 +12133,10 @@ int Client::_unlink(Inode *dir, const char *name, const UserPerm& perm)
   res = _lookup(dir, name, 0, &otherin, perm);
   if (res < 0)
     goto fail;
-  req->set_other_inode(otherin.get());
+
+  in = otherin.get();
+  req->set_other_inode(in);
+  in->break_all_delegs();
   req->other_inode_drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL;
 
   req->set_inode(dir);
@@ -11639,7 +12144,7 @@ int Client::_unlink(Inode *dir, const char *name, const UserPerm& perm)
   res = make_request(req, perm);
 
   trim_cache();
-  ldout(cct, 3) << "unlink(" << path << ") = " << res << dendl;
+  ldout(cct, 8) << "unlink(" << path << ") = " << res << dendl;
   return res;
 
  fail:
@@ -11651,6 +12156,9 @@ int Client::ll_unlink(Inode *in, const char *name, const UserPerm& perm)
 {
   Mutex::Locker lock(client_lock);
 
+  if (unmounting)
+    return -ENOTCONN;
+
   vinodeno_t vino = _get_vino(in);
 
   ldout(cct, 3) << "ll_unlink " << vino << " " << name << dendl;
@@ -11668,14 +12176,15 @@ int Client::ll_unlink(Inode *in, const char *name, const UserPerm& perm)
 
 int Client::_rmdir(Inode *dir, const char *name, const UserPerm& perms)
 {
-  ldout(cct, 3) << "_rmdir(" << dir->ino << " " << name << " uid "
+  ldout(cct, 8) << "_rmdir(" << dir->ino << " " << name << " uid "
                << perms.uid() << " gid " << perms.gid() << ")" << dendl;
 
   if (dir->snapid != CEPH_NOSNAP && dir->snapid != CEPH_SNAPDIR) {
     return -EROFS;
   }
-
-  MetaRequest *req = new MetaRequest(dir->snapid == CEPH_SNAPDIR ? CEPH_MDS_OP_RMSNAP:CEPH_MDS_OP_RMDIR);
+  
+  int op = dir->snapid == CEPH_SNAPDIR ? CEPH_MDS_OP_RMSNAP : CEPH_MDS_OP_RMDIR;
+  MetaRequest *req = new MetaRequest(op);
   filepath path;
   dir->make_nosnap_relative_path(path);
   path.push_dentry(name);
@@ -11691,22 +12200,27 @@ int Client::_rmdir(Inode *dir, const char *name, const UserPerm& perms)
   int res = get_or_create(dir, name, &de);
   if (res < 0)
     goto fail;
+  if (op == CEPH_MDS_OP_RMDIR) 
+    req->set_dentry(de);
+  else
+    de->get();
+
   res = _lookup(dir, name, 0, &in, perms);
   if (res < 0)
     goto fail;
-  if (req->get_op() == CEPH_MDS_OP_RMDIR) {
+  if (op == CEPH_MDS_OP_RMDIR) {
     req->set_inode(dir);
-    req->set_dentry(de);
     req->set_other_inode(in.get());
   } else {
     unlink(de, true, true);
+    de->put();
     req->set_other_inode(in.get());
   }
 
   res = make_request(req, perms);
 
   trim_cache();
-  ldout(cct, 3) << "rmdir(" << path << ") = " << res << dendl;
+  ldout(cct, 8) << "rmdir(" << path << ") = " << res << dendl;
   return res;
 
  fail:
@@ -11718,6 +12232,9 @@ int Client::ll_rmdir(Inode *in, const char *name, const UserPerm& perms)
 {
   Mutex::Locker lock(client_lock);
 
+  if (unmounting)
+    return -ENOTCONN;
+
   vinodeno_t vino = _get_vino(in);
 
   ldout(cct, 3) << "ll_rmdir " << vino << " " << name << dendl;
@@ -11736,7 +12253,7 @@ int Client::ll_rmdir(Inode *in, const char *name, const UserPerm& perms)
 
 int Client::_rename(Inode *fromdir, const char *fromname, Inode *todir, const char *toname, const UserPerm& perm)
 {
-  ldout(cct, 3) << "_rename(" << fromdir->ino << " " << fromname << " to "
+  ldout(cct, 8) << "_rename(" << fromdir->ino << " " << fromname << " to "
                << todir->ino << " " << toname
                << " uid " << perm.uid() << " gid " << perm.gid() << ")"
                << dendl;
@@ -11795,15 +12312,26 @@ int Client::_rename(Inode *fromdir, const char *fromname, Inode *todir, const ch
     res = _lookup(fromdir, fromname, 0, &oldin, perm);
     if (res < 0)
       goto fail;
-    req->set_old_inode(oldin.get());
+
+    Inode *oldinode = oldin.get();
+    oldinode->break_all_delegs();
+    req->set_old_inode(oldinode);
     req->old_inode_drop = CEPH_CAP_LINK_SHARED;
 
     res = _lookup(todir, toname, 0, &otherin, perm);
-    if (res != 0 && res != -ENOENT) {
-      goto fail;
-    } else if (res == 0) {
-      req->set_other_inode(otherin.get());
+    switch (res) {
+    case 0:
+      {
+       Inode *in = otherin.get();
+       req->set_other_inode(in);
+       in->break_all_delegs();
+      }
       req->other_inode_drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL;
+      break;
+    case -ENOENT:
+      break;
+    default:
+      goto fail;
     }
 
     req->set_inode(todir);
@@ -11820,7 +12348,7 @@ int Client::_rename(Inode *fromdir, const char *fromname, Inode *todir, const ch
   // renamed item from our cache
 
   trim_cache();
-  ldout(cct, 3) << "_rename(" << from << ", " << to << ") = " << res << dendl;
+  ldout(cct, 8) << "_rename(" << from << ", " << to << ") = " << res << dendl;
   return res;
 
  fail:
@@ -11833,6 +12361,9 @@ int Client::ll_rename(Inode *parent, const char *name, Inode *newparent,
 {
   Mutex::Locker lock(client_lock);
 
+  if (unmounting)
+    return -ENOTCONN;
+
   vinodeno_t vparent = _get_vino(parent);
   vinodeno_t vnewparent = _get_vino(newparent);
 
@@ -11858,7 +12389,7 @@ int Client::ll_rename(Inode *parent, const char *name, Inode *newparent,
 
 int Client::_link(Inode *in, Inode *dir, const char *newname, const UserPerm& perm, InodeRef *inp)
 {
-  ldout(cct, 3) << "_link(" << in->ino << " to " << dir->ino << " " << newname
+  ldout(cct, 8) << "_link(" << in->ino << " to " << dir->ino << " " << newname
                << " uid " << perm.uid() << " gid " << perm.gid() << ")" << dendl;
 
   if (strlen(newname) > NAME_MAX)
@@ -11871,6 +12402,7 @@ int Client::_link(Inode *in, Inode *dir, const char *newname, const UserPerm& pe
     return -EDQUOT;
   }
 
+  in->break_all_delegs();
   MetaRequest *req = new MetaRequest(CEPH_MDS_OP_LINK);
 
   filepath path(newname, dir->ino);
@@ -11892,7 +12424,7 @@ int Client::_link(Inode *in, Inode *dir, const char *newname, const UserPerm& pe
   ldout(cct, 10) << "link result is " << res << dendl;
 
   trim_cache();
-  ldout(cct, 3) << "link(" << existing << ", " << path << ") = " << res << dendl;
+  ldout(cct, 8) << "link(" << existing << ", " << path << ") = " << res << dendl;
   return res;
 
  fail:
@@ -11905,6 +12437,9 @@ int Client::ll_link(Inode *in, Inode *newparent, const char *newname,
 {
   Mutex::Locker lock(client_lock);
 
+  if (unmounting)
+    return -ENOTCONN;
+
   vinodeno_t vino = _get_vino(in);
   vinodeno_t vnewparent = _get_vino(newparent);
 
@@ -11943,6 +12478,7 @@ int Client::ll_num_osds(void)
 int Client::ll_osdaddr(int osd, uint32_t *addr)
 {
   Mutex::Locker lock(client_lock);
+
   entity_addr_t g;
   bool exists = objecter->with_osdmap([&](const OSDMap& o) {
       if (!o.exists(osd))
@@ -11956,6 +12492,7 @@ int Client::ll_osdaddr(int osd, uint32_t *addr)
   *addr = ntohl(nb_addr);
   return 0;
 }
+
 uint32_t Client::ll_stripe_unit(Inode *in)
 {
   Mutex::Locker lock(client_lock);
@@ -11991,7 +12528,8 @@ int Client::ll_get_stripe_osd(Inode *in, uint64_t blockno,
                              file_layout_t* layout)
 {
   Mutex::Locker lock(client_lock);
-  inodeno_t ino = ll_get_inodeno(in);
+
+  inodeno_t ino = in->ino;
   uint32_t object_size = layout->object_size;
   uint32_t su = layout->stripe_unit;
   uint32_t stripe_count = layout->stripe_count;
@@ -12032,6 +12570,9 @@ int Client::ll_opendir(Inode *in, int flags, dir_result_t** dirpp,
 {
   Mutex::Locker lock(client_lock);
 
+  if (unmounting)
+    return -ENOTCONN;
+
   vinodeno_t vino = _get_vino(in);
 
   ldout(cct, 3) << "ll_opendir " << vino << dendl;
@@ -12058,6 +12599,10 @@ int Client::ll_releasedir(dir_result_t *dirp)
   ldout(cct, 3) << "ll_releasedir " << dirp << dendl;
   tout(cct) << "ll_releasedir" << std::endl;
   tout(cct) << (unsigned long)dirp << std::endl;
+
+  if (unmounting)
+    return -ENOTCONN;
+
   _closedir(dirp);
   return 0;
 }
@@ -12069,6 +12614,9 @@ int Client::ll_fsyncdir(dir_result_t *dirp)
   tout(cct) << "ll_fsyncdir" << std::endl;
   tout(cct) << (unsigned long)dirp << std::endl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   return _fsync(dirp->inode.get(), false);
 }
 
@@ -12078,6 +12626,9 @@ int Client::ll_open(Inode *in, int flags, Fh **fhp, const UserPerm& perms)
 
   Mutex::Locker lock(client_lock);
 
+  if (unmounting)
+    return -ENOTCONN;
+
   vinodeno_t vino = _get_vino(in);
 
   ldout(cct, 3) << "ll_open " << vino << " " << ceph_flags_sys2wire(flags) << dendl;
@@ -12113,7 +12664,7 @@ int Client::_ll_create(Inode *parent, const char *name, mode_t mode,
 
   vinodeno_t vparent = _get_vino(parent);
 
-  ldout(cct, 3) << "_ll_create " << vparent << " " << name << " 0" << oct <<
+  ldout(cct, 8) << "_ll_create " << vparent << " " << name << " 0" << oct <<
     mode << dec << " " << ceph_flags_sys2wire(flags) << ", uid " << perms.uid()
                << ", gid " << perms.gid() << dendl;
   tout(cct) << "ll_create" << std::endl;
@@ -12180,7 +12731,7 @@ out:
 
   tout(cct) << (unsigned long)*fhp << std::endl;
   tout(cct) << ino << std::endl;
-  ldout(cct, 3) << "_ll_create " << vparent << " " << name << " 0" << oct <<
+  ldout(cct, 8) << "_ll_create " << vparent << " " << name << " 0" << oct <<
     mode << dec << " " << ceph_flags_sys2wire(flags) << " = " << r << " (" <<
     *fhp << " " << hex << ino << dec << ")" << dendl;
 
@@ -12194,6 +12745,9 @@ int Client::ll_create(Inode *parent, const char *name, mode_t mode,
   Mutex::Locker lock(client_lock);
   InodeRef in;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   int r = _ll_create(parent, name, mode, flags, &in, CEPH_STAT_CAP_INODE_ALL,
                      fhp, perms);
   if (r >= 0) {
@@ -12221,6 +12775,8 @@ int Client::ll_createx(Inode *parent, const char *name, mode_t mode,
   Mutex::Locker lock(client_lock);
   InodeRef in;
 
+  if (unmounting)
+    return -ENOTCONN;
 
   int r = _ll_create(parent, name, mode, oflags, &in, caps, fhp, perms);
   if (r >= 0) {
@@ -12247,6 +12803,9 @@ loff_t Client::ll_lseek(Fh *fh, loff_t offset, int whence)
   tout(cct) << offset << std::endl;
   tout(cct) << whence << std::endl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   return _lseek(fh, offset, whence);
 }
 
@@ -12259,6 +12818,9 @@ int Client::ll_read(Fh *fh, loff_t off, loff_t len, bufferlist *bl)
   tout(cct) << off << std::endl;
   tout(cct) << len << std::endl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   return _read(fh, off, len, bl);
 }
 
@@ -12269,7 +12831,11 @@ int Client::ll_read_block(Inode *in, uint64_t blockid,
                          file_layout_t* layout)
 {
   Mutex::Locker lock(client_lock);
-  vinodeno_t vino = ll_get_vino(in);
+
+  if (unmounting)
+    return -ENOTCONN;
+
+  vinodeno_t vino = _get_vino(in);
   object_t oid = file_object_t(vino.ino, blockid);
   C_SaferCond onfinish;
   bufferlist bl;
@@ -12308,7 +12874,7 @@ int Client::ll_write_block(Inode *in, uint64_t blockid,
   Cond cond;
   bool done;
   int r = 0;
-  Context *onsafe;
+  Context *onsafe = nullptr;
 
   if (length == 0) {
     return -EINVAL;
@@ -12340,6 +12906,11 @@ int Client::ll_write_block(Inode *in, uint64_t blockid,
 
   /* lock just in time */
   client_lock.Lock();
+  if (unmounting) {
+    client_lock.Unlock();
+    delete onsafe;
+    return -ENOTCONN;
+  }
 
   objecter->write(oid,
                  object_locator_t(layout->pool_id),
@@ -12373,7 +12944,7 @@ int Client::ll_commit_blocks(Inode *in,
     Mutex::Locker lock(client_lock);
     /*
     BarrierContext *bctx;
-    vinodeno_t vino = ll_get_vino(in);
+    vinodeno_t vino = _get_vino(in);
     uint64_t ino = vino.ino;
 
     ldout(cct, 1) << "ll_commit_blocks for " << vino.ino << " from "
@@ -12402,6 +12973,9 @@ int Client::ll_write(Fh *fh, loff_t off, loff_t len, const char *data)
   tout(cct) << off << std::endl;
   tout(cct) << len << std::endl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   int r = _write(fh, off, len, data, NULL, 0);
   ldout(cct, 3) << "ll_write " << fh << " " << off << "~" << len << " = " << r
                << dendl;
@@ -12415,6 +12989,9 @@ int Client::ll_flush(Fh *fh)
   tout(cct) << "ll_flush" << std::endl;
   tout(cct) << (unsigned long)fh << std::endl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   return _flush(fh);
 }
 
@@ -12425,6 +13002,9 @@ int Client::ll_fsync(Fh *fh, bool syncdataonly)
   tout(cct) << "ll_fsync" << std::endl;
   tout(cct) << (unsigned long)fh << std::endl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   int r = _fsync(fh, syncdataonly);
   if (r) {
     // If we're returning an error, clear it from the FH
@@ -12433,6 +13013,19 @@ int Client::ll_fsync(Fh *fh, bool syncdataonly)
   return r;
 }
 
+int Client::ll_sync_inode(Inode *in, bool syncdataonly)
+{
+  Mutex::Locker lock(client_lock);
+  ldout(cct, 3) << "ll_sync_inode " << *in << " " << dendl;
+  tout(cct) << "ll_sync_inode" << std::endl;
+  tout(cct) << (unsigned long)in << std::endl;
+
+  if (unmounting)
+    return -ENOTCONN;
+
+  return _fsync(in, syncdataonly);
+}
+
 #ifdef FALLOC_FL_PUNCH_HOLE
 
 int Client::_fallocate(Fh *fh, int mode, int64_t offset, int64_t length)
@@ -12460,9 +13053,10 @@ int Client::_fallocate(Fh *fh, int mode, int64_t offset, int64_t length)
     return -EBADF;
 
   uint64_t size = offset + length;
+  std::list<InodeRef> quota_roots;
   if (!(mode & (FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE)) &&
       size > in->size &&
-      is_quota_bytes_exceeded(in, size - in->size, fh->actor_perms)) {
+      is_quota_bytes_exceeded(in, size - in->size, fh->actor_perms, &quota_roots)) {
     return -EDQUOT;
   }
 
@@ -12495,9 +13089,9 @@ int Client::_fallocate(Fh *fh, int mode, int64_t offset, int64_t length)
         in->inline_data = bl;
         in->inline_version++;
       }
-      in->mtime = ceph_clock_now();
+      in->mtime = in->ctime = ceph_clock_now();
       in->change_attr++;
-      mark_caps_dirty(in, CEPH_CAP_FILE_WR);
+      in->mark_caps_dirty(CEPH_CAP_FILE_WR);
     } else {
       if (in->inline_version < CEPH_INLINE_NONE) {
         onuninline = new C_SafeCond(&uninline_flock,
@@ -12521,9 +13115,9 @@ int Client::_fallocate(Fh *fh, int mode, int64_t offset, int64_t length)
                  offset, length,
                  ceph::real_clock::now(),
                  0, true, onfinish);
-      in->mtime = ceph_clock_now();
+      in->mtime = in->ctime = ceph_clock_now();
       in->change_attr++;
-      mark_caps_dirty(in, CEPH_CAP_FILE_WR);
+      in->mark_caps_dirty(CEPH_CAP_FILE_WR);
 
       client_lock.Unlock();
       flock.Lock();
@@ -12537,11 +13131,11 @@ int Client::_fallocate(Fh *fh, int mode, int64_t offset, int64_t length)
     uint64_t size = offset + length;
     if (size > in->size) {
       in->size = size;
-      in->mtime = ceph_clock_now();
+      in->mtime = in->ctime = ceph_clock_now();
       in->change_attr++;
-      mark_caps_dirty(in, CEPH_CAP_FILE_WR);
+      in->mark_caps_dirty(CEPH_CAP_FILE_WR);
 
-      if (is_quota_bytes_approaching(in, fh->actor_perms)) {
+      if (is_quota_bytes_approaching(in, quota_roots)) {
         check_caps(in, CHECK_CAPS_NODELAY);
       } else if (is_max_size_approaching(in)) {
        check_caps(in, 0);
@@ -12560,7 +13154,7 @@ int Client::_fallocate(Fh *fh, int mode, int64_t offset, int64_t length)
     if (uninline_ret >= 0 || uninline_ret == -ECANCELED) {
       in->inline_data.clear();
       in->inline_version = CEPH_INLINE_NONE;
-      mark_caps_dirty(in, CEPH_CAP_FILE_WR);
+      in->mark_caps_dirty(CEPH_CAP_FILE_WR);
       check_caps(in, 0);
     } else
       r = uninline_ret;
@@ -12586,6 +13180,9 @@ int Client::ll_fallocate(Fh *fh, int mode, loff_t offset, loff_t length)
   tout(cct) << "ll_fallocate " << mode << " " << offset << " " << length << std::endl;
   tout(cct) << (unsigned long)fh << std::endl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   return _fallocate(fh, mode, offset, length);
 }
 
@@ -12594,6 +13191,9 @@ int Client::fallocate(int fd, int mode, loff_t offset, loff_t length)
   Mutex::Locker lock(client_lock);
   tout(cct) << "fallocate " << " " << fd << mode << " " << offset << " " << length << std::endl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   Fh *fh = get_filehandle(fd);
   if (!fh)
     return -EBADF;
@@ -12607,6 +13207,10 @@ int Client::fallocate(int fd, int mode, loff_t offset, loff_t length)
 int Client::ll_release(Fh *fh)
 {
   Mutex::Locker lock(client_lock);
+
+  if (unmounting)
+    return -ENOTCONN;
+
   ldout(cct, 3) << "ll_release (fh)" << fh << " " << fh->inode->ino << " " <<
     dendl;
   tout(cct) << "ll_release (fh)" << std::endl;
@@ -12624,6 +13228,9 @@ int Client::ll_getlk(Fh *fh, struct flock *fl, uint64_t owner)
   ldout(cct, 3) << "ll_getlk (fh)" << fh << " " << fh->inode->ino << dendl;
   tout(cct) << "ll_getk (fh)" << (unsigned long)fh << std::endl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   return _getlk(fh, fl, owner);
 }
 
@@ -12634,6 +13241,9 @@ int Client::ll_setlk(Fh *fh, struct flock *fl, uint64_t owner, int sleep)
   ldout(cct, 3) << "ll_setlk  (fh) " << fh << " " << fh->inode->ino << dendl;
   tout(cct) << "ll_setk (fh)" << (unsigned long)fh << std::endl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   return _setlk(fh, fl, owner, sleep);
 }
 
@@ -12644,9 +13254,54 @@ int Client::ll_flock(Fh *fh, int cmd, uint64_t owner)
   ldout(cct, 3) << "ll_flock  (fh) " << fh << " " << fh->inode->ino << dendl;
   tout(cct) << "ll_flock (fh)" << (unsigned long)fh << std::endl;
 
+  if (unmounting)
+    return -ENOTCONN;
+
   return _flock(fh, cmd, owner);
 }
 
+int Client::set_deleg_timeout(uint32_t timeout)
+{
+  Mutex::Locker lock(client_lock);
+
+  /*
+   * The whole point is to prevent blacklisting so we must time out the
+   * delegation before the session autoclose timeout kicks in.
+   */
+  if (timeout >= mdsmap->get_session_autoclose())
+    return -EINVAL;
+
+  deleg_timeout = timeout;
+  return 0;
+}
+
+int Client::ll_delegation(Fh *fh, unsigned cmd, ceph_deleg_cb_t cb, void *priv)
+{
+  int ret = -EINVAL;
+
+  Mutex::Locker lock(client_lock);
+
+  if (!mounted)
+    return -ENOTCONN;
+
+  Inode *inode = fh->inode.get();
+
+  switch(cmd) {
+  case CEPH_DELEGATION_NONE:
+    inode->unset_deleg(fh);
+    ret = 0;
+    break;
+  default:
+    try {
+      ret = inode->set_deleg(fh, cmd, cb, priv);
+    } catch (std::bad_alloc) {
+      ret = -ENOMEM;
+    }
+    break;
+  }
+  return ret;
+}
+
 class C_Client_RequestInterrupt : public Context  {
 private:
   Client *client;
@@ -12681,6 +13336,9 @@ int Client::describe_layout(const char *relpath, file_layout_t *lp,
 {
   Mutex::Locker lock(client_lock);
 
+  if (unmounting)
+    return -ENOTCONN;
+
   filepath path(relpath);
   InodeRef in;
   int r = path_walk(path, &in, perms);
@@ -12697,6 +13355,9 @@ int Client::fdescribe_layout(int fd, file_layout_t *lp)
 {
   Mutex::Locker lock(client_lock);
 
+  if (unmounting)
+    return -ENOTCONN;
+
   Fh *f = get_filehandle(fd);
   if (!f)
     return -EBADF;
@@ -12708,12 +13369,26 @@ int Client::fdescribe_layout(int fd, file_layout_t *lp)
   return 0;
 }
 
+int64_t Client::get_default_pool_id()
+{
+  Mutex::Locker lock(client_lock);
+
+  if (unmounting)
+    return -ENOTCONN;
+
+  /* first data pool is the default */ 
+  return mdsmap->get_first_data_pool(); 
+}
 
 // expose osdmap
 
 int64_t Client::get_pool_id(const char *pool_name)
 {
   Mutex::Locker lock(client_lock);
+
+  if (unmounting)
+    return -ENOTCONN;
+
   return objecter->with_osdmap(std::mem_fn(&OSDMap::lookup_pg_pool_name),
                               pool_name);
 }
@@ -12721,6 +13396,10 @@ int64_t Client::get_pool_id(const char *pool_name)
 string Client::get_pool_name(int64_t pool)
 {
   Mutex::Locker lock(client_lock);
+
+  if (unmounting)
+    return string();
+
   return objecter->with_osdmap([pool](const OSDMap& o) {
       return o.have_pg_pool(pool) ? o.get_pool_name(pool) : string();
     });
@@ -12729,6 +13408,10 @@ string Client::get_pool_name(int64_t pool)
 int Client::get_pool_replication(int64_t pool)
 {
   Mutex::Locker lock(client_lock);
+
+  if (unmounting)
+    return -ENOTCONN;
+
   return objecter->with_osdmap([pool](const OSDMap& o) {
       return o.have_pg_pool(pool) ? o.get_pg_pool(pool)->get_size() : -ENOENT;
     });
@@ -12738,6 +13421,9 @@ int Client::get_file_extent_osds(int fd, loff_t off, loff_t *len, vector<int>& o
 {
   Mutex::Locker lock(client_lock);
 
+  if (unmounting)
+    return -ENOTCONN;
+
   Fh *f = get_filehandle(fd);
   if (!f)
     return -EBADF;
@@ -12780,6 +13466,10 @@ int Client::get_file_extent_osds(int fd, loff_t off, loff_t *len, vector<int>& o
 int Client::get_osd_crush_location(int id, vector<pair<string, string> >& path)
 {
   Mutex::Locker lock(client_lock);
+
+  if (unmounting)
+    return -ENOTCONN;
+
   if (id < 0)
     return -EINVAL;
   return objecter->with_osdmap([&](const OSDMap& o) {
@@ -12792,6 +13482,9 @@ int Client::get_file_stripe_address(int fd, loff_t offset,
 {
   Mutex::Locker lock(client_lock);
 
+  if (unmounting)
+    return -ENOTCONN;
+
   Fh *f = get_filehandle(fd);
   if (!f)
     return -EBADF;
@@ -12821,6 +13514,10 @@ int Client::get_file_stripe_address(int fd, loff_t offset,
 int Client::get_osd_addr(int osd, entity_addr_t& addr)
 {
   Mutex::Locker lock(client_lock);
+
+  if (unmounting)
+    return -ENOTCONN;
+
   return objecter->with_osdmap([&](const OSDMap& o) {
       if (!o.exists(osd))
        return -ENOENT;
@@ -12835,6 +13532,9 @@ int Client::enumerate_layout(int fd, vector<ObjectExtent>& result,
 {
   Mutex::Locker lock(client_lock);
 
+  if (unmounting)
+    return -ENOTCONN;
+
   Fh *f = get_filehandle(fd);
   if (!f)
     return -EBADF;
@@ -12848,12 +13548,14 @@ int Client::enumerate_layout(int fd, vector<ObjectExtent>& result,
 }
 
 
-/*
- * find an osd with the same ip.  -1 if none.
- */
+/* find an osd with the same ip.  -ENXIO if none. */
 int Client::get_local_osd()
 {
   Mutex::Locker lock(client_lock);
+
+  if (unmounting)
+    return -ENOTCONN;
+
   objecter->with_osdmap([this](const OSDMap& o) {
       if (o.get_epoch() != local_osd_epoch) {
        local_osd = o.find_osd_on_ip(messenger->get_myaddr());
@@ -12900,6 +13602,7 @@ void Client::ms_handle_remote_reset(Connection *con)
        }
       }
       if (mds >= 0) {
+       assert (s != NULL);
        switch (s->state) {
        case MetaSession::STATE_CLOSING:
          ldout(cct, 1) << "reset from mds we were closing; we'll call that closed" << dendl;
@@ -12919,6 +13622,7 @@ void Client::ms_handle_remote_reset(Connection *con)
 
        case MetaSession::STATE_OPEN:
          {
+           objecter->maybe_request_map(); /* to check if we are blacklisted */
            const md_config_t *conf = cct->_conf;
            if (conf->client_reconnect_stale) {
              ldout(cct, 1) << "reset from mds we were open; close mds session for reconnect" << dendl;
@@ -12994,6 +13698,12 @@ Inode *Client::get_quota_root(Inode *in, const UserPerm& perms)
     if (cur == root_ancestor)
       break;
 
+    // deleted inode
+    if (cur->nlink == 0) {
+      cur = root_ancestor;
+      break;
+    }
+
     MetaRequest *req = new MetaRequest(CEPH_MDS_OP_LOOKUPNAME);
     filepath path(cur->ino);
     req->set_filepath(path);
@@ -13055,32 +13765,34 @@ bool Client::is_quota_files_exceeded(Inode *in, const UserPerm& perms)
 }
 
 bool Client::is_quota_bytes_exceeded(Inode *in, int64_t new_bytes,
-                                    const UserPerm& perms)
+                                    const UserPerm& perms,
+                                    std::list<InodeRef>* quota_roots)
 {
   return check_quota_condition(in, perms,
-      [&new_bytes](const Inode &in) {
+      [&new_bytes, quota_roots](const Inode &in) {
+       if (quota_roots)
+         quota_roots->emplace_back(const_cast<Inode*>(&in));
         return in.quota.max_bytes && (in.rstat.rbytes + new_bytes)
                > in.quota.max_bytes;
       });
 }
 
-bool Client::is_quota_bytes_approaching(Inode *in, const UserPerm& perms)
+bool Client::is_quota_bytes_approaching(Inode *in, std::list<InodeRef>& quota_roots)
 {
-  return check_quota_condition(in, perms,
-      [](const Inode &in) {
-        if (in.quota.max_bytes) {
-          if (in.rstat.rbytes >= in.quota.max_bytes) {
-            return true;
-          }
-
-          assert(in.size >= in.reported_size);
-          const uint64_t space = in.quota.max_bytes - in.rstat.rbytes;
-          const uint64_t size = in.size - in.reported_size;
-          return (space >> 4) < size;
-        } else {
-          return false;
-        }
-      });
+  assert(in->size >= in->reported_size);
+  const uint64_t size = in->size - in->reported_size;
+
+  for (auto& diri : quota_roots) {
+    if (diri->quota.max_bytes) {
+      if (diri->rstat.rbytes >= diri->quota.max_bytes)
+       return true;
+
+      uint64_t space = diri->quota.max_bytes - diri->rstat.rbytes;
+      if ((space >> 4) < size)
+       return true;
+    }
+  }
+  return false;
 }
 
 enum {
@@ -13313,6 +14025,8 @@ const char** Client::get_tracked_conf_keys() const
     "client_cache_size",
     "client_cache_mid",
     "client_acl_type",
+    "client_deleg_timeout",
+    "client_deleg_break_on_open",
     NULL
   };
   return keys;
@@ -13323,9 +14037,7 @@ void Client::handle_conf_change(const struct md_config_t *conf,
 {
   Mutex::Locker lock(client_lock);
 
-  if (changed.count("client_cache_size") ||
-      changed.count("client_cache_mid")) {
-    lru.lru_set_max(cct->_conf->client_cache_size);
+  if (changed.count("client_cache_mid")) {
     lru.lru_set_midpoint(cct->_conf->client_cache_mid);
   }
   if (changed.count("client_acl_type")) {
@@ -13335,13 +14047,6 @@ void Client::handle_conf_change(const struct md_config_t *conf,
   }
 }
 
-void Client::init_groups(UserPerm *perms)
-{
-  gid_t *sgids;
-  int count = _getgrouplist(&sgids, perms->uid(), perms->gid());
-  perms->init_gids(sgids, count);
-}
-
 void intrusive_ptr_add_ref(Inode *in)
 {
   in->get();
@@ -13418,4 +14123,3 @@ void StandaloneClient::shutdown()
   objecter->shutdown();
   monclient->shutdown();
 }
-