]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/client/Client.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / client / Client.cc
index 7c3f117a803dd281cf9ca25a04c4187604b15cae..2b7db5a894d1cc325e2fd2a3d2d7a114d24484e1 100644 (file)
@@ -72,6 +72,7 @@
 
 #include "mds/flock.h"
 #include "mds/cephfs_features.h"
+#include "mds/snap.h"
 #include "osd/OSDMap.h"
 #include "osdc/Filer.h"
 
@@ -1209,6 +1210,11 @@ Dentry *Client::insert_dentry_inode(Dir *dir, const string& dname, LeaseStat *dl
     Inode *diri = dir->parent_inode;
     clear_dir_complete_and_ordered(diri, false);
     dn = link(dir, dname, in, dn);
+
+    if (old_dentry) {
+      dn->is_renaming = false;
+      signal_cond_list(waiting_for_rename);
+    }
   }
 
   update_dentry_lease(dn, dlease, from, session);
@@ -1292,7 +1298,8 @@ void Client::clear_dir_complete_and_ordered(Inode *diri, bool complete)
 /*
  * insert results from readdir or lssnap into the metadata cache.
  */
-void Client::insert_readdir_results(MetaRequest *request, MetaSession *session, Inode *diri) {
+void Client::insert_readdir_results(MetaRequest *request, MetaSession *session,
+                                    Inode *diri, Inode *diri_other) {
 
   auto& reply = request->reply;
   ConnectionRef con = request->reply->get_connection();
@@ -1307,7 +1314,8 @@ void Client::insert_readdir_results(MetaRequest *request, MetaSession *session,
   dir_result_t *dirp = request->dirp;
   ceph_assert(dirp);
 
-  // the extra buffer list is only set for readdir and lssnap replies
+  // the extra buffer list is only set for readdir, lssnap and
+  // readdir_snapdiff replies
   auto p = reply->get_extra_bl().cbegin();
   if (!p.end()) {
     // snapdir?
@@ -1315,10 +1323,27 @@ void Client::insert_readdir_results(MetaRequest *request, MetaSession *session,
       ceph_assert(diri);
       diri = open_snapdir(diri);
     }
+    bool snapdiff_req = request->head.op == CEPH_MDS_OP_READDIR_SNAPDIFF;
+    frag_t fg;
+    unsigned offset_hash;
+    if (snapdiff_req) {
+      fg = (unsigned)request->head.args.snapdiff.frag;
+      offset_hash = (unsigned)request->head.args.snapdiff.offset_hash;
+    } else {
+      fg = (unsigned)request->head.args.readdir.frag;
+      offset_hash = (unsigned)request->head.args.readdir.offset_hash;
+    }
 
     // only open dir if we're actually adding stuff to it!
     Dir *dir = diri->open_dir();
     ceph_assert(dir);
+    //open opponent dir for snapdiff if any
+    Dir *dir_other = nullptr;
+    if (snapdiff_req) {
+      ceph_assert(diri_other);
+      dir_other = diri_other->open_dir();
+      ceph_assert(dir_other);
+    }
 
     // dirstat
     DirStat dst(p, features);
@@ -1330,7 +1355,6 @@ void Client::insert_readdir_results(MetaRequest *request, MetaSession *session,
     bool end = ((unsigned)flags & CEPH_READDIR_FRAG_END);
     bool hash_order = ((unsigned)flags & CEPH_READDIR_HASH_ORDER);
 
-    frag_t fg = (unsigned)request->head.args.readdir.frag;
     unsigned readdir_offset = dirp->next_offset;
     string readdir_start = dirp->last_name;
     ceph_assert(!readdir_start.empty() || readdir_offset == 2);
@@ -1341,7 +1365,7 @@ void Client::insert_readdir_results(MetaRequest *request, MetaSession *session,
        last_hash = ceph_frag_value(diri->hash_dentry_name(readdir_start));
       } else if (flags & CEPH_READDIR_OFFSET_HASH) {
        /* mds understands offset_hash */
-       last_hash = (unsigned)request->head.args.readdir.offset_hash;
+       last_hash = offset_hash;
       }
     }
 
@@ -1386,13 +1410,22 @@ void Client::insert_readdir_results(MetaRequest *request, MetaSession *session,
 
       Inode *in = add_update_inode(&ist, request->sent_stamp, session,
                                   request->perms);
+      auto *effective_dir = dir;
+      auto *effective_diri = diri;
+
+      if (snapdiff_req && in->snapid != diri->snapid) {
+        ceph_assert(diri_other);
+        ceph_assert(dir_other);
+        effective_diri = diri_other;
+        effective_dir = dir_other;
+      }
       Dentry *dn;
-      if (diri->dir->dentries.count(dname)) {
-       Dentry *olddn = diri->dir->dentries[dname];
+      if (effective_dir->dentries.count(dname)) {
+       Dentry *olddn = effective_dir->dentries[dname];
        if (olddn->inode != in) {
          // replace incorrect dentry
          unlink(olddn, true, true);  // keep dir, dentry
-         dn = link(dir, dname, in, olddn);
+         dn = link(effective_dir, dname, in, olddn);
          ceph_assert(dn == olddn);
        } else {
          // keep existing dn
@@ -1401,13 +1434,13 @@ void Client::insert_readdir_results(MetaRequest *request, MetaSession *session,
        }
       } else {
        // new dn
-       dn = link(dir, dname, in, NULL);
+       dn = link(effective_dir, dname, in, NULL);
       }
       dn->alternate_name = std::move(dlease.alternate_name);
 
       update_dentry_lease(dn, &dlease, request->sent_stamp, session);
       if (hash_order) {
-       unsigned hash = ceph_frag_value(diri->hash_dentry_name(dname));
+       unsigned hash = ceph_frag_value(effective_diri->hash_dentry_name(dname));
        if (hash != last_hash)
          readdir_offset = 2;
        last_hash = hash;
@@ -1416,20 +1449,21 @@ void Client::insert_readdir_results(MetaRequest *request, MetaSession *session,
        dn->offset = dir_result_t::make_fpos(fg, readdir_offset++, false);
       }
       // add to readdir cache
-      if (dirp->release_count == diri->dir_release_count &&
-         dirp->ordered_count == diri->dir_ordered_count &&
-         dirp->start_shared_gen == diri->shared_gen) {
-       if (dirp->cache_index == dir->readdir_cache.size()) {
+      if (!snapdiff_req &&
+          dirp->release_count == effective_diri->dir_release_count &&
+         dirp->ordered_count == effective_diri->dir_ordered_count &&
+         dirp->start_shared_gen == effective_diri->shared_gen) {
+       if (dirp->cache_index == effective_dir->readdir_cache.size()) {
          if (i == 0) {
            ceph_assert(!dirp->inode->is_complete_and_ordered());
            dir->readdir_cache.reserve(dirp->cache_index + numdn);
          }
-         dir->readdir_cache.push_back(dn);
-       } else if (dirp->cache_index < dir->readdir_cache.size()) {
+          effective_dir->readdir_cache.push_back(dn);
+       } else if (dirp->cache_index < effective_dir->readdir_cache.size()) {
          if (dirp->inode->is_complete_and_ordered())
-           ceph_assert(dir->readdir_cache[dirp->cache_index] == dn);
+           ceph_assert(effective_dir->readdir_cache[dirp->cache_index] == dn);
          else
-           dir->readdir_cache[dirp->cache_index] = dn;
+            effective_dir->readdir_cache[dirp->cache_index] = dn;
        } else {
          ceph_abort_msg("unexpected readdir buffer idx");
        }
@@ -1449,6 +1483,8 @@ void Client::insert_readdir_results(MetaRequest *request, MetaSession *session,
 
     if (dir->is_empty())
       close_dir(dir);
+    if (dir_other && dir_other->is_empty())
+      close_dir(dir_other);
   }
 }
 
@@ -1608,10 +1644,20 @@ Inode* Client::insert_trace(MetaRequest *request, MetaSession *session)
   if (in) {
     if (op == CEPH_MDS_OP_READDIR ||
        op == CEPH_MDS_OP_LSSNAP) {
-      insert_readdir_results(request, session, in);
+      insert_readdir_results(request,
+       session,
+       in,
+       nullptr);
     } else if (op == CEPH_MDS_OP_LOOKUPNAME) {
       // hack: return parent inode instead
       in = diri;
+    } else if (op == CEPH_MDS_OP_READDIR_SNAPDIFF) {
+      // provide both request's inode (aka snapA) and traced one (snapB)
+      // to properly match snapdiff results
+      insert_readdir_results(request,
+       session,
+       request->inode(),
+       in);
     }
 
     if (request->dentry() == NULL && in != request->inode()) {
@@ -1685,7 +1731,7 @@ mds_rank_t Client::choose_target_mds(MetaRequest *req, Inode** phash_diri)
            * I think the MDS should be able to redirect as needed*/
          in = in->get_first_parent()->dir->parent_inode;
         else {
-          ldout(cct, 10) << "got unlinked inode, can't look at parent" << dendl;
+          ldout(cct, 10) << __func__ << "got unlinked inode, can't look at parent" << dendl;
           break;
         }
       }
@@ -2333,6 +2379,12 @@ void Client::_closed_mds_session(MetaSession *s, int err, bool rejected)
     mds_sessions.erase(s->mds_num);
 }
 
+static void reinit_mds_features(MetaSession *session,
+                               const MConstRef<MClientSession>& m) {
+  session->mds_features = std::move(m->supported_features);
+  session->mds_metric_flags = std::move(m->metric_spec.metric_flags);
+}
+
 void Client::handle_client_session(const MConstRef<MClientSession>& m)
 {
   mds_rank_t from = mds_rank_t(m->get_source().num());
@@ -2351,6 +2403,13 @@ void Client::handle_client_session(const MConstRef<MClientSession>& m)
       if (session->state == MetaSession::STATE_OPEN) {
         ldout(cct, 10) << "mds." << from << " already opened, ignore it"
                        << dendl;
+       // The MDS could send a client_session(open) message even when
+       // the session state is STATE_OPEN. Normally, its fine to
+       // ignore this message, but, if the MDS sent this message just
+       // after it got upgraded, the MDS feature bits could differ
+       // than the one before the upgrade - so, refresh the feature
+       // bits the client holds.
+       reinit_mds_features(session.get(), m);
         return;
       }
       /*
@@ -2360,8 +2419,7 @@ void Client::handle_client_session(const MConstRef<MClientSession>& m)
       if (!session->seq && m->get_seq())
         session->seq = m->get_seq();
 
-      session->mds_features = std::move(m->supported_features);
-      session->mds_metric_flags = std::move(m->metric_spec.metric_flags);
+      reinit_mds_features(session.get(), m);
 
       renew_caps(session.get());
       session->state = MetaSession::STATE_OPEN;
@@ -2546,7 +2604,7 @@ ref_t<MClientRequest> Client::build_client_request(MetaRequest *request, mds_ran
     }
   }
 
-  auto req = make_message<MClientRequest>(request->get_op(), old_version);
+  auto req = make_message<MClientRequest>(request->get_op(), session->mds_features);
   req->set_tid(request->tid);
   req->set_stamp(request->op_stamp);
   memcpy(&req->head, &request->head, sizeof(ceph_mds_request_head));
@@ -3421,12 +3479,17 @@ Dentry* Client::link(Dir *dir, const string& name, Inode *in, Dentry *dn)
 
     lru.lru_insert_mid(dn);    // mid or top?
 
-    ldout(cct, 15) << "link dir " << dir->parent_inode << " '" << name << "' to inode " << in
-                  << " dn " << dn << " (new dn)" << dendl;
+    if(in) {
+      ldout(cct, 15) << "link dir " << *dir->parent_inode << " '" << name << "' to inode " << *in
+                    << " dn " << *dn << " (new dn)" << dendl;
+    } else {
+      ldout(cct, 15) << "link dir " << *dir->parent_inode << " '" << name << "' "
+        << " dn " << *dn << " (new dn)" << dendl;
+    }
   } else {
     ceph_assert(!dn->inode);
-    ldout(cct, 15) << "link dir " << dir->parent_inode << " '" << name << "' to inode " << in
-                  << " dn " << dn << " (old dn)" << dendl;
+    ldout(cct, 15) << "link dir " << *dir->parent_inode << " '" << name << "' to inode " << in
+                  << " dn " << *dn << " (old dn)" << dendl;
   }
 
   if (in) {    // link to inode
@@ -3529,7 +3592,7 @@ void Client::put_cap_ref(Inode *in, int cap)
     int put_nref = 0;
     int drop = last & ~in->caps_issued();
     if (in->snapid == CEPH_NOSNAP) {
-      if ((last & (CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER)) &&
+      if ((last & CEPH_CAP_FILE_WR) &&
          !in->cap_snaps.empty() &&
          in->cap_snaps.rbegin()->second.writing) {
        ldout(cct, 10) << __func__ << " finishing pending cap_snap on " << *in << dendl;
@@ -3543,6 +3606,10 @@ void Client::put_cap_ref(Inode *in, int cap)
        signal_cond_list(in->waitfor_commit);
        ldout(cct, 5) << __func__ << " dropped last FILE_BUFFER ref on " << *in << dendl;
        ++put_nref;
+
+       if (!in->cap_snaps.empty()) {
+         flush_snaps(in);
+       }
       }
     }
     if (last & CEPH_CAP_FILE_CACHE) {
@@ -4003,15 +4070,13 @@ void Client::queue_cap_snap(Inode *in, SnapContext& old_snapc)
       in->cap_snaps.rbegin()->second.writing) {
     ldout(cct, 10) << __func__ << " already have pending cap_snap on " << *in << dendl;
     return;
-  } else if (in->caps_dirty() ||
-            (used & CEPH_CAP_FILE_WR) ||
-            (dirty & CEPH_CAP_ANY_WR)) {
+  } else if (dirty || (used & CEPH_CAP_FILE_WR)) {
     const auto &capsnapem = in->cap_snaps.emplace(std::piecewise_construct, std::make_tuple(old_snapc.seq), std::make_tuple(in));
     ceph_assert(capsnapem.second); /* element inserted */
     CapSnap &capsnap = capsnapem.first->second;
     capsnap.context = old_snapc;
     capsnap.issued = in->caps_issued();
-    capsnap.dirty = in->caps_dirty();
+    capsnap.dirty = dirty;
 
     capsnap.dirty_data = (used & CEPH_CAP_FILE_BUFFER);
 
@@ -4058,9 +4123,11 @@ void Client::finish_cap_snap(Inode *in, CapSnap &capsnap, int used)
   }
 
   if (used & CEPH_CAP_FILE_BUFFER) {
-    capsnap.writing = 1;
     ldout(cct, 10) << __func__ << " " << *in << " cap_snap " << &capsnap << " used " << used
-            << " WRBUFFER, delaying" << dendl;
+            << " WRBUFFER, trigger to flush dirty buffer" << dendl;
+
+    /* trigger to flush the buffer */
+    _flush(in, new C_Client_FlushComplete(this, in));
   } else {
     capsnap.dirty_data = 0;
     flush_snaps(in);
@@ -4412,11 +4479,19 @@ void Client::add_update_cap(Inode *in, MetaSession *mds_session, uint64_t cap_id
   if (flags & CEPH_CAP_FLAG_AUTH) {
     if (in->auth_cap != &cap &&
         (!in->auth_cap || ceph_seq_cmp(in->auth_cap->mseq, mseq) < 0)) {
-      if (in->auth_cap && in->flushing_cap_item.is_on_list()) {
-       ldout(cct, 10) << __func__ << " changing auth cap: "
-                      << "add myself to new auth MDS' flushing caps list" << dendl;
-       adjust_session_flushing_caps(in, in->auth_cap->session, mds_session);
+      if (in->auth_cap) {
+        if (in->flushing_cap_item.is_on_list()) {
+          ldout(cct, 10) << __func__ << " changing auth cap: "
+                         << "add myself to new auth MDS' flushing caps list" << dendl;
+          adjust_session_flushing_caps(in, in->auth_cap->session, mds_session);
+        }
+        if (in->dirty_cap_item.is_on_list()) {
+          ldout(cct, 10) << __func__ << " changing auth cap: "
+                         << "add myself to new auth MDS' dirty caps list" << dendl;
+          mds_session->get_dirty_list().push_back(&in->dirty_cap_item);
+        }
       }
+
       in->auth_cap = &cap;
     }
   }
@@ -5263,24 +5338,48 @@ void Client::handle_caps(const MConstRef<MClientCaps>& m)
 
   got_mds_push(session.get());
 
+  bool do_cap_release = false;
   Inode *in;
   vinodeno_t vino(m->get_ino(), CEPH_NOSNAP);
   if (auto it = inode_map.find(vino); it != inode_map.end()) {
     in = it->second;
+
+    /* MDS maybe waiting for cap release with increased seq */
+    switch (m->get_op()) {
+      case CEPH_CAP_OP_REVOKE:
+      case CEPH_CAP_OP_GRANT:
+        if (!in->caps.count(mds)) {
+         do_cap_release = true;
+          ldout(cct, 5) << __func__ << " vino " << vino << " don't have cap "
+                        << m->get_cap_id() << " op " << m->get_op()
+                        << ", immediately releasing" << dendl;
+       }
+    }
   } else {
-    if (m->get_op() == CEPH_CAP_OP_IMPORT) {
-      ldout(cct, 5) << __func__ << " don't have vino " << vino << " on IMPORT, immediately releasing" << dendl;
-      session->enqueue_cap_release(
-        m->get_ino(),
-        m->get_cap_id(),
-        m->get_seq(),
-        m->get_mseq(),
-        cap_epoch_barrier);
-    } else {
-      ldout(cct, 5) << __func__ << " don't have vino " << vino << ", dropping" << dendl;
+    /* MDS maybe waiting for cap release with increased seq */
+    switch (m->get_op()) {
+      case CEPH_CAP_OP_IMPORT:
+      case CEPH_CAP_OP_REVOKE:
+      case CEPH_CAP_OP_GRANT:
+       do_cap_release = true;
+        ldout(cct, 5) << __func__ << " don't have vino " << vino << " op "
+                      << m->get_op() << ", immediately releasing" << dendl;
+       break;
+      default:
+        ldout(cct, 5) << __func__ << " don't have vino " << vino << ", dropping" << dendl;
+       return;
     }
+  }
+
+  // In case the mds is waiting on e.g. a revocation
+  if (do_cap_release) {
+    session->enqueue_cap_release(
+      m->get_ino(),
+      m->get_cap_id(),
+      m->get_seq(),
+      m->get_mseq(),
+      cap_epoch_barrier);
 
-    // in case the mds is waiting on e.g. a revocation
     flush_cap_releases();
     return;
   }
@@ -5757,6 +5856,13 @@ void Client::handle_cap_grant(MetaSession *session, Inode *in, Cap *cap, const M
     }
   }
 
+  // just in case the caps was released just before we get the revoke msg
+  if (!check && m->get_op() == CEPH_CAP_OP_REVOKE) {
+    cap->wanted = 0; // don't let check_caps skip sending a response to MDS
+    check = true;
+    flags = CHECK_CAPS_NODELAY;
+  }
+
   if (check)
     check_caps(in, flags);
 
@@ -6888,6 +6994,13 @@ void Client::collect_and_send_global_metrics() {
   ldout(cct, 20) << __func__ << dendl;
   ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
 
+  /* Do not send the metrics until the MDS rank is ready */
+  if (!mdsmap->is_active((mds_rank_t)0)) {
+    ldout(cct, 5) << __func__ << " MDS rank 0 is not ready yet -- not sending metric"
+                  << dendl;
+    return;
+  }
+
   if (!have_open_session((mds_rank_t)0)) {
     ldout(cct, 5) << __func__ << ": no session with rank=0 -- not sending metric"
                   << dendl;
@@ -7059,7 +7172,8 @@ bool Client::_dentry_valid(const Dentry *dn)
 }
 
 int Client::_lookup(Inode *dir, const string& dname, int mask, InodeRef *target,
-                   const UserPerm& perms, std::string* alternate_name)
+                    const UserPerm& perms, std::string* alternate_name,
+                    bool is_rename)
 {
   int r = 0;
   Dentry *dn = NULL;
@@ -7138,6 +7252,19 @@ relookup:
     } else {
       ldout(cct, 20) << " no cap on " << dn->inode->vino() << dendl;
     }
+
+    // In rare case during the rename if another thread tries to
+    // lookup the dst dentry, it may get an inconsistent result
+    // that both src dentry and dst dentry will link to the same
+    // inode at the same time.
+    // Will wait the rename to finish and try it again.
+    if (!is_rename && dn->is_renaming) {
+      ldout(cct, 1) << __func__ << " dir " << *dir
+                    << " rename is on the way, will wait for dn '"
+                    << dname << "'" << dendl;
+      wait_on_list(waiting_for_rename);
+      goto relookup;
+    }
   } else {
     // can we conclude ENOENT locally?
     if (dir->caps_issued_mask(CEPH_CAP_FILE_SHARED, true) &&
@@ -9055,7 +9182,8 @@ void Client::_readdir_drop_dirp_buffer(dir_result_t *dirp)
   dirp->buffer.clear();
 }
 
-int Client::_readdir_get_frag(dir_result_t *dirp)
+int Client::_readdir_get_frag(int op, dir_result_t* dirp,
+  fill_readdir_args_cb_t fill_req_cb)
 {
   ceph_assert(dirp);
   ceph_assert(dirp->inode);
@@ -9070,33 +9198,18 @@ int Client::_readdir_get_frag(dir_result_t *dirp)
   ldout(cct, 10) << __func__ << " " << dirp << " on " << dirp->inode->ino << " fg " << fg
                 << " offset " << hex << dirp->offset << dec << dendl;
 
-  int op = CEPH_MDS_OP_READDIR;
-  if (dirp->inode && dirp->inode->snapid == CEPH_SNAPDIR)
-    op = CEPH_MDS_OP_LSSNAP;
-
   InodeRef& diri = dirp->inode;
 
   MetaRequest *req = new MetaRequest(op);
-  filepath path;
-  diri->make_nosnap_relative_path(path);
-  req->set_filepath(path); 
-  req->set_inode(diri.get());
-  req->head.args.readdir.frag = fg;
-  req->head.args.readdir.flags = CEPH_READDIR_REPLY_BITFLAGS;
-  if (dirp->last_name.length()) {
-    req->path2.set_path(dirp->last_name);
-  } else if (dirp->hash_order()) {
-    req->head.args.readdir.offset_hash = dirp->offset_high();
-  }
-  req->dirp = dirp;
-  
+  fill_req_cb(dirp, req, diri, fg);
+
   bufferlist dirbl;
   int res = make_request(req, dirp->perms, NULL, NULL, -1, &dirbl);
   
   if (res == -CEPHFS_EAGAIN) {
     ldout(cct, 10) << __func__ << " got EAGAIN, retrying" << dendl;
     _readdir_rechoose_frag(dirp);
-    return _readdir_get_frag(dirp);
+    return _readdir_get_frag(op, dirp, fill_req_cb);
   }
 
   if (res == 0) {
@@ -9121,7 +9234,8 @@ int Client::_readdir_cache_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p,
 {
   ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
   ldout(cct, 10) << __func__ << " " << dirp << " on " << dirp->inode->ino
-          << " last_name " << dirp->last_name << " offset " << hex << dirp->offset << dec
+          << " last_name " << dirp->last_name
+          << " offset " << hex << dirp->offset << dec
           << dendl;
   Dir *dir = dirp->inode->dir;
 
@@ -9210,8 +9324,57 @@ int Client::_readdir_cache_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p,
   return 0;
 }
 
-int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p,
-                        unsigned want, unsigned flags, bool getref)
+int Client::readdir_r_cb(dir_result_t* d,
+  add_dirent_cb_t cb,
+  void* p,
+  unsigned want,
+  unsigned flags,
+  bool getref)
+{
+  auto fill_readdir_cb = [](dir_result_t* dirp,
+                           MetaRequest* req,
+                           InodeRef& diri,
+                           frag_t fg) {
+    filepath path;
+    diri->make_nosnap_relative_path(path);
+    req->set_filepath(path);
+    req->set_inode(diri.get());
+    req->head.args.readdir.frag = fg;
+    req->head.args.readdir.flags = CEPH_READDIR_REPLY_BITFLAGS;
+    if (dirp->last_name.length()) {
+      req->path2.set_path(dirp->last_name);
+    } else if (dirp->hash_order()) {
+      req->head.args.readdir.offset_hash = dirp->offset_high();
+    }
+    req->dirp = dirp;
+  };
+  int op = CEPH_MDS_OP_READDIR;
+  if (d->inode && d->inode->snapid == CEPH_SNAPDIR)
+    op = CEPH_MDS_OP_LSSNAP;
+  return _readdir_r_cb(op,
+    d,
+    cb,
+    fill_readdir_cb,
+    p,
+    want,
+    flags,
+    getref,
+    false);
+}
+
+//
+// NB: this is used for both readdir and readdir_snapdiff results processing
+// hence it should be request type agnostic
+//
+int Client::_readdir_r_cb(int op,
+  dir_result_t *d,
+  add_dirent_cb_t cb,
+  fill_readdir_args_cb_t fill_cb,
+  void *p,
+  unsigned want,
+  unsigned flags,
+  bool getref,
+  bool bypass_cache)
 {
   int caps = statx_to_mask(flags, want);
 
@@ -9301,12 +9464,14 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p,
   }
 
   // can we read from our cache?
-  ldout(cct, 10) << "offset " << hex << dirp->offset << dec
+  ldout(cct, 10) << __func__
+           << " offset " << hex << dirp->offset << dec
           << " snapid " << dirp->inode->snapid << " (complete && ordered) "
           << dirp->inode->is_complete_and_ordered()
           << " issued " << ccap_string(dirp->inode->caps_issued())
           << dendl;
-  if (dirp->inode->snapid != CEPH_SNAPDIR &&
+  if (!bypass_cache &&
+      dirp->inode->snapid != CEPH_SNAPDIR &&
       dirp->inode->is_complete_and_ordered() &&
       dirp->inode->caps_issued_mask(CEPH_CAP_FILE_SHARED, true)) {
     int err = _readdir_cache_cb(dirp, cb, p, caps, getref);
@@ -9320,7 +9485,7 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p,
 
     bool check_caps = true;
     if (!dirp->is_cached()) {
-      int r = _readdir_get_frag(dirp);
+      int r = _readdir_get_frag(op, dirp, fill_cb);
       if (r)
        return r;
       // _readdir_get_frag () may updates dirp->offset if the replied dirfrag is
@@ -9329,7 +9494,8 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p,
     }
     frag_t fg = dirp->buffer_frag;
 
-    ldout(cct, 10) << "frag " << fg << " buffer size " << dirp->buffer.size()
+    ldout(cct, 10) << __func__
+                   << " frag " << fg << " buffer size " << dirp->buffer.size()
                   << " offset " << hex << dirp->offset << dendl;
 
     for (auto it = std::lower_bound(dirp->buffer.begin(), dirp->buffer.end(),
@@ -9364,7 +9530,9 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p,
       r = cb(p, &de, &stx, next_off, inode);  // _next_ offset
       cl.lock();
 
-      ldout(cct, 15) << " de " << de.d_name << " off " << hex << next_off - 1 << dec
+      ldout(cct, 15) << __func__
+                     << " de " << de.d_name << " off " << hex << next_off - 1 << dec
+                     << " snap " << entry.inode->snapid
                     << " = " << r << dendl;
       if (r < 0)
        return r;
@@ -9386,7 +9554,8 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p,
       continue;
     }
 
-    if (diri->shared_gen == dirp->start_shared_gen &&
+    if (!bypass_cache &&
+       diri->shared_gen == dirp->start_shared_gen &&
        diri->dir_release_count == dirp->release_count) {
       if (diri->dir_ordered_count == dirp->ordered_count) {
        ldout(cct, 10) << " marking (I_COMPLETE|I_DIR_ORDERED) on " << *diri << dendl;
@@ -9492,6 +9661,81 @@ int Client::readdirplus_r(dir_result_t *d, struct dirent *de,
   return 0;
 }
 
+int Client::readdir_snapdiff(dir_result_t* d1, snapid_t snap2,
+                             struct dirent* out_de,
+                             snapid_t* out_snap)
+{
+  if (!d1 || !d1->inode || d1->inode->snapid == snap2) {
+    lderr(cct) << __func__ << " invalid parameters: "
+               << " d1:" << d1
+               << " d1->inode:" << (d1 ? d1->inode : nullptr)
+               << " snap2 id :" << snap2
+              << dendl;
+    errno = EINVAL;
+    return -errno;
+  }
+
+  auto& de = d1->de;
+  ceph_statx stx;
+  single_readdir sr;
+  sr.de = &de;
+  sr.stx = &stx;
+  sr.inode = NULL;
+  sr.full = false;
+
+  auto fill_snapdiff_cb = [&](dir_result_t* dirp,
+    MetaRequest* req,
+    InodeRef& diri,
+    frag_t fg) {
+      filepath path;
+      diri->make_nosnap_relative_path(path);
+      req->set_filepath(path);
+      req->set_inode(diri.get());
+      req->head.args.snapdiff.snap_other = snap2;
+      req->head.args.snapdiff.frag = fg;
+      req->head.args.snapdiff.flags = CEPH_READDIR_REPLY_BITFLAGS;
+      if (dirp->last_name.length()) {
+       req->path2.set_path(dirp->last_name);
+      } else if (dirp->hash_order()) {
+       req->head.args.snapdiff.offset_hash = dirp->offset_high();
+      }
+      req->dirp = dirp;
+  };
+
+  // our callback fills the dirent and sets sr.full=true on first
+  // call, and returns -1 the second time around.
+  int ret = _readdir_r_cb(CEPH_MDS_OP_READDIR_SNAPDIFF,
+    d1,
+    _readdir_single_dirent_cb,
+    fill_snapdiff_cb,
+    (void*)&sr,
+    0,
+    AT_STATX_DONT_SYNC,
+    false,
+    true);
+  if (ret < -1) {
+    lderr(cct) << __func__ << " error: "
+               << cpp_strerror(ret)
+               << dendl;
+    errno = -ret;  // this sucks.
+    return ret;
+  }
+
+  ldout(cct, 15) << __func__ << " " << ret
+    << " " << sr.de->d_name
+    << " " << stx.stx_dev
+    << dendl;
+  if (sr.full) {
+    if (out_de) {
+      *out_de = de;
+    }
+    if (out_snap) {
+      *out_snap = stx.stx_dev;
+    }
+    return 1;
+  }
+  return 0;
+}
 
 /* getdents */
 struct getdents_result {
@@ -13569,6 +13813,8 @@ int Client::_mknod(Inode *dir, const char *name, mode_t mode, dev_t rdev,
 
   MetaRequest *req = new MetaRequest(CEPH_MDS_OP_MKNOD);
 
+  req->set_inode_owner_uid_gid(perms.uid(), perms.gid());
+
   filepath path;
   dir->make_nosnap_relative_path(path);
   path.push_dentry(name);
@@ -13713,6 +13959,8 @@ int Client::_create(Inode *dir, const char *name, int flags, mode_t mode,
 
   MetaRequest *req = new MetaRequest(CEPH_MDS_OP_CREATE);
 
+  req->set_inode_owner_uid_gid(perms.uid(), perms.gid());
+
   filepath path;
   dir->make_nosnap_relative_path(path);
   path.push_dentry(name);
@@ -13790,6 +14038,9 @@ int Client::_mkdir(Inode *dir, const char *name, mode_t mode, const UserPerm& pe
   MetaRequest *req = new MetaRequest(is_snap_op ?
                                     CEPH_MDS_OP_MKSNAP : CEPH_MDS_OP_MKDIR);
 
+  if (!is_snap_op)
+    req->set_inode_owner_uid_gid(perm.uid(), perm.gid());
+
   filepath path;
   dir->make_nosnap_relative_path(path);
   path.push_dentry(name);
@@ -13928,6 +14179,8 @@ int Client::_symlink(Inode *dir, const char *name, const char *target,
 
   MetaRequest *req = new MetaRequest(CEPH_MDS_OP_SYMLINK);
 
+  req->set_inode_owner_uid_gid(perms.uid(), perms.gid());
+
   filepath path;
   dir->make_nosnap_relative_path(path);
   path.push_dentry(name);
@@ -14180,11 +14433,13 @@ int Client::_rename(Inode *fromdir, const char *fromname, Inode *todir, const ch
     else
       return -CEPHFS_EROFS;
   }
+
+  // don't allow cross-quota renames
   if (cct->_conf.get_val<bool>("client_quota") && fromdir != todir) {
     Inode *fromdir_root =
-      fromdir->quota.is_enabled(QUOTA_MAX_FILES) ? fromdir : get_quota_root(fromdir, perm, QUOTA_MAX_FILES);
+      fromdir->quota.is_enabled() ? fromdir : get_quota_root(fromdir, perm);
     Inode *todir_root =
-      todir->quota.is_enabled(QUOTA_MAX_FILES) ? todir : get_quota_root(todir, perm, QUOTA_MAX_FILES);
+      todir->quota.is_enabled() ? todir : get_quota_root(todir, perm);
     if (fromdir_root != todir_root) {
       return -CEPHFS_EXDEV;
     }
@@ -14212,12 +14467,13 @@ int Client::_rename(Inode *fromdir, const char *fromname, Inode *todir, const ch
     req->old_dentry_drop = CEPH_CAP_FILE_SHARED;
     req->old_dentry_unless = CEPH_CAP_FILE_EXCL;
 
+    de->is_renaming = true;
     req->set_dentry(de);
     req->dentry_drop = CEPH_CAP_FILE_SHARED;
     req->dentry_unless = CEPH_CAP_FILE_EXCL;
 
     InodeRef oldin, otherin;
-    res = _lookup(fromdir, fromname, 0, &oldin, perm);
+    res = _lookup(fromdir, fromname, 0, &oldin, perm, nullptr, true);
     if (res < 0)
       goto fail;
 
@@ -14226,7 +14482,7 @@ int Client::_rename(Inode *fromdir, const char *fromname, Inode *todir, const ch
     req->set_old_inode(oldinode);
     req->old_inode_drop = CEPH_CAP_LINK_SHARED;
 
-    res = _lookup(todir, toname, 0, &otherin, perm);
+    res = _lookup(todir, toname, 0, &otherin, perm, nullptr, true);
     switch (res) {
     case 0:
       {
@@ -14255,6 +14511,12 @@ int Client::_rename(Inode *fromdir, const char *fromname, Inode *todir, const ch
   res = make_request(req, perm, &target);
   ldout(cct, 10) << "rename result is " << res << dendl;
 
+  // if rename fails it will miss waking up the waiters
+  if (op == CEPH_MDS_OP_RENAME && de->is_renaming) {
+    de->is_renaming = false;
+    signal_cond_list(waiting_for_rename);
+  }
+
   // renamed item from our cache
 
   trim_cache();