]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/client/Client.cc
update sources to 12.2.7
[ceph.git] / ceph / src / client / Client.cc
index 1fd435839490a2d240a7a4d207d292aa45871559..41eff1b4b2a247e64b534a4646a8dfd96172a8df 100644 (file)
@@ -236,7 +236,6 @@ Client::Client(Messenger *m, MonClient *mc, Objecter *objecter_)
     remount_cb(NULL),
     ino_invalidate_cb(NULL),
     dentry_invalidate_cb(NULL),
-    getgroups_cb(NULL),
     umask_cb(NULL),
     can_invalidate_dentries(false),
     async_ino_invalidator(m->cct),
@@ -902,8 +901,10 @@ Inode * Client::add_update_inode(InodeStat *st, utime_t from,
     add_update_cap(in, session, st->cap.cap_id, st->cap.caps, st->cap.seq,
                   st->cap.mseq, inodeno_t(st->cap.realm), st->cap.flags,
                   request_perms);
-    if (in->auth_cap && in->auth_cap->session == session)
+    if (in->auth_cap && in->auth_cap->session == session) {
       in->max_size = st->max_size;
+      in->rstat = st->rstat;
+    }
   } else
     in->snap_caps |= st->cap.caps;
 
@@ -2076,6 +2077,10 @@ void Client::handle_client_session(MClientSession *m)
     break;
 
   case CEPH_SESSION_STALE:
+    // invalidate session caps/leases
+    session->cap_gen++;
+    session->cap_ttl = ceph_clock_now();
+    session->cap_ttl -= 1;
     renew_caps(session);
     break;
 
@@ -3232,7 +3237,7 @@ void Client::cap_delay_requeue(Inode *in)
   ldout(cct, 10) << "cap_delay_requeue on " << *in << dendl;
   in->hold_caps_until = ceph_clock_now();
   in->hold_caps_until += cct->_conf->client_caps_release_delay;
-  delayed_caps.push_back(&in->cap_item);
+  delayed_list.push_back(&in->delay_cap_item);
 }
 
 void Client::send_cap(Inode *in, MetaSession *session, Cap *cap,
@@ -3762,7 +3767,7 @@ void Client::_invalidate_inode_cache(Inode *in, int64_t off, int64_t len)
   if (cct->_conf->client_oc) {
     vector<ObjectExtent> ls;
     Striper::file_to_extents(cct, in->ino, &in->layout, off, len, in->truncate_size, ls);
-    objectcacher->discard_set(&in->oset, ls);
+    objectcacher->discard_writeback(&in->oset, ls, nullptr);
   }
 
   _schedule_invalidate_callback(in, off, len);
@@ -3901,7 +3906,6 @@ void Client::add_update_cap(Inode *in, MetaSession *mds_session, uint64_t cap_id
     cap->session = mds_session;
     cap->inode = in;
     cap->gen = mds_session->cap_gen;
-    cap_list.push_back(&in->cap_item);
   }
 
   check_cap_issue(in, cap, issued);
@@ -3925,6 +3929,7 @@ void Client::add_update_cap(Inode *in, MetaSession *mds_session, uint64_t cap_id
   cap->seq = seq;
   cap->issue_seq = seq;
   cap->mseq = mseq;
+  cap->gen = mds_session->cap_gen;
   cap->latest_perms = cap_perms;
   ldout(cct, 10) << "add_update_cap issued " << ccap_string(old_caps) << " -> " << ccap_string(cap->issued)
           << " from mds." << mds
@@ -4020,7 +4025,7 @@ void Client::remove_session_caps(MetaSession *s)
        in->flushing_cap_tids.clear();
       }
       in->flushing_caps = 0;
-      in->dirty_caps = 0;
+      in->mark_caps_clean();
       put_inode(in);
     }
   }
@@ -4085,16 +4090,17 @@ void Client::_invalidate_kernel_dcache()
   }
 }
 
-void Client::trim_caps(MetaSession *s, int max)
+void Client::trim_caps(MetaSession *s, uint64_t max)
 {
   mds_rank_t mds = s->mds_num;
-  int caps_size = s->caps.size();
+  size_t caps_size = s->caps.size();
   ldout(cct, 10) << "trim_caps mds." << mds << " max " << max 
     << " caps " << caps_size << dendl;
 
-  int trimmed = 0;
-  xlist<Cap*>::iterator p = s->caps.begin();
-  std::set<InodeRef> anchor; /* prevent put_inode from deleting all caps during traversal */
+  uint64_t trimmed = 0;
+  auto p = s->caps.begin();
+  std::set<Dentry *> to_trim; /* this avoids caps other than the one we're
+                               * looking at from getting deleted during traversal. */
   while ((caps_size - trimmed) > max && !p.end()) {
     Cap *cap = *p;
     InodeRef in(cap->inode);
@@ -4109,8 +4115,7 @@ void Client::trim_caps(MetaSession *s, int max)
       // disposable non-auth cap
       if (!(get_caps_used(in.get()) & ~oissued & mine)) {
        ldout(cct, 20) << " removing unused, unneeded non-auth cap on " << *in << dendl;
-       remove_cap(cap, true);
-        /* N.B. no need to push onto anchor, as we are only removing one cap */
+       cap = (remove_cap(cap, true), nullptr);
        trimmed++;
       }
     } else {
@@ -4127,9 +4132,8 @@ void Client::trim_caps(MetaSession *s, int max)
            // the end of this function.
            _schedule_invalidate_dentry_callback(dn, true);
          }
-          ldout(cct, 20) << " anchoring inode: " << in->ino << dendl;
-          anchor.insert(in);
-         trim_dentry(dn);
+          ldout(cct, 20) << " queueing dentry for trimming: " << dn->name << dendl;
+          to_trim.insert(dn);
         } else {
           ldout(cct, 20) << "  not expirable: " << dn->name << dendl;
          all = false;
@@ -4141,8 +4145,11 @@ void Client::trim_caps(MetaSession *s, int max)
       }
     }
   }
-  ldout(cct, 20) << " clearing anchored inodes" << dendl;
-  anchor.clear();
+  ldout(cct, 20) << " trimming queued dentries: " << dendl;
+  for (const auto &dn : to_trim) {
+    trim_dentry(dn);
+  }
+  to_trim.clear();
 
   caps_size = s->caps.size();
   if (caps_size > max)
@@ -4159,15 +4166,6 @@ void Client::force_session_readonly(MetaSession *s)
   }
 }
 
-void Client::mark_caps_dirty(Inode *in, int caps)
-{
-  ldout(cct, 10) << "mark_caps_dirty " << *in << " " << ccap_string(in->dirty_caps) << " -> "
-          << ccap_string(in->dirty_caps | caps) << dendl;
-  if (caps && !in->caps_dirty())
-    in->get();
-  in->dirty_caps |= caps;
-}
-
 int Client::mark_caps_flushing(Inode *in, ceph_tid_t* ptid)
 {
   MetaSession *session = in->auth_cap->session;
@@ -4186,7 +4184,7 @@ int Client::mark_caps_flushing(Inode *in, ceph_tid_t* ptid)
   }
 
   in->flushing_caps |= flushing;
-  in->dirty_caps = 0;
+  in->mark_caps_clean();
  
   if (!in->flushing_cap_item.is_on_list())
     session->flushing_caps.push_back(&in->flushing_cap_item);
@@ -4222,20 +4220,20 @@ void Client::adjust_session_flushing_caps(Inode *in, MetaSession *old_s,  MetaSe
 void Client::flush_caps_sync()
 {
   ldout(cct, 10) << __func__ << dendl;
-  xlist<Inode*>::iterator p = delayed_caps.begin();
+  xlist<Inode*>::iterator p = delayed_list.begin();
   while (!p.end()) {
     unsigned flags = CHECK_CAPS_NODELAY;
     Inode *in = *p;
 
     ++p;
-    delayed_caps.pop_front();
-    if (p.end() && cap_list.empty())
+    delayed_list.pop_front();
+    if (p.end() && dirty_list.empty())
       flags |= CHECK_CAPS_SYNCHRONOUS;
     check_caps(in, flags);
   }
 
   // other caps, too
-  p = cap_list.begin();
+  p = dirty_list.begin();
   while (!p.end()) {
     unsigned flags = CHECK_CAPS_NODELAY;
     Inode *in = *p;
@@ -5041,6 +5039,7 @@ void Client::handle_cap_grant(MetaSession *session, Inode *in, Cap *cap, MClient
                << " caps now " << ccap_string(new_caps)
                << " was " << ccap_string(old_caps) << dendl;
   cap->seq = m->get_seq();
+  cap->gen = session->cap_gen;
 
   in->layout = m->get_layout();
 
@@ -5069,6 +5068,12 @@ void Client::handle_cap_grant(MetaSession *session, Inode *in, Cap *cap, MClient
     ::decode(in->xattrs, p);
     in->xattr_version = m->head.xattr_version;
   }
+
+  if ((new_caps & CEPH_CAP_FILE_SHARED) && m->dirstat_is_valid()) {
+    in->dirstat.nfiles = m->get_nfiles();
+    in->dirstat.nsubdirs = m->get_nsubdirs();
+  }
+
   update_inode_file_bits(in, m->get_truncate_seq(), m->get_truncate_size(), m->get_size(),
                         m->get_change_attr(), m->get_time_warp_seq(), m->get_ctime(),
                         m->get_mtime(), m->get_atime(),
@@ -5104,17 +5109,16 @@ void Client::handle_cap_grant(MetaSession *session, Inode *in, Cap *cap, MClient
     else if (revoked & ceph_deleg_caps_for_type(CEPH_DELEGATION_WR))
       in->recall_deleg(true);
 
-    if (((used & ~new_caps) & CEPH_CAP_FILE_BUFFER)
-        && !_flush(in, new C_Client_FlushComplete(this, in))) {
+    if ((used & revoked & CEPH_CAP_FILE_BUFFER) &&
+       !_flush(in, new C_Client_FlushComplete(this, in))) {
       // waitin' for flush
-    } else if ((old_caps & ~new_caps) & CEPH_CAP_FILE_CACHE) {
+    } else if (revoked & CEPH_CAP_FILE_CACHE) {
       if (_release(in))
        check = true;
     } else {
       cap->wanted = 0; // don't let check_caps skip sending a response to MDS
       check = true;
     }
-
   } else if (old_caps == new_caps) {
     ldout(cct, 10) << "  caps unchanged at " << ccap_string(old_caps) << dendl;
   } else {
@@ -5149,63 +5153,6 @@ void Client::handle_cap_grant(MetaSession *session, Inode *in, Cap *cap, MClient
   m->put();
 }
 
-int Client::_getgrouplist(gid_t** sgids, uid_t uid, gid_t gid)
-{
-  // cppcheck-suppress variableScope
-  int sgid_count;
-  gid_t *sgid_buf;
-
-  if (getgroups_cb) {
-    sgid_count = getgroups_cb(callback_handle, &sgid_buf);
-    if (sgid_count > 0) {
-      *sgids = sgid_buf;
-      return sgid_count;
-    }
-  }
-
-#if HAVE_GETGROUPLIST
-  struct passwd *pw;
-  pw = getpwuid(uid);
-  if (pw == NULL) {
-    ldout(cct, 3) << "getting user entry failed" << dendl;
-    return -errno;
-  }
-  //use PAM to get the group list
-  // initial number of group entries, defaults to posix standard of 16
-  // PAM implementations may provide more than 16 groups....
-  sgid_count = 16;
-  sgid_buf = (gid_t*)malloc(sgid_count * sizeof(gid_t));
-  if (sgid_buf == NULL) {
-    ldout(cct, 3) << "allocating group memory failed" << dendl;
-    return -ENOMEM;
-  }
-
-  while (1) {
-#if defined(__APPLE__)
-    if (getgrouplist(pw->pw_name, gid, (int*)sgid_buf, &sgid_count) == -1) {
-#else
-    if (getgrouplist(pw->pw_name, gid, sgid_buf, &sgid_count) == -1) {
-#endif
-      // we need to resize the group list and try again
-      void *_realloc = NULL;
-      if ((_realloc = realloc(sgid_buf, sgid_count * sizeof(gid_t))) == NULL) {
-       ldout(cct, 3) << "allocating group memory failed" << dendl;
-       free(sgid_buf);
-       return -ENOMEM;
-      }
-      sgid_buf = (gid_t*)_realloc;
-      continue;
-    }
-    // list was successfully retrieved
-    break;
-  }
-  *sgids = sgid_buf;
-  return sgid_count;
-#else
-  return 0;
-#endif
-}
-
 int Client::inode_permission(Inode *in, const UserPerm& perms, unsigned want)
 {
   if (perms.uid() == 0)
@@ -6054,14 +6001,13 @@ void Client::tick()
   }
 
   // delayed caps
-  xlist<Inode*>::iterator p = delayed_caps.begin();
+  xlist<Inode*>::iterator p = delayed_list.begin();
   while (!p.end()) {
     Inode *in = *p;
     ++p;
     if (in->hold_caps_until > now)
       break;
-    delayed_caps.pop_front();
-    cap_list.push_back(&in->cap_item);
+    delayed_list.pop_front();
     check_caps(in, CHECK_CAPS_NODELAY);
   }
 
@@ -6720,11 +6666,11 @@ int Client::_do_setattr(Inode *in, struct ceph_statx *stx, int mask,
     in->cap_dirtier_uid = perms.uid();
     in->cap_dirtier_gid = perms.gid();
     if (issued & CEPH_CAP_AUTH_EXCL)
-      mark_caps_dirty(in, CEPH_CAP_AUTH_EXCL);
+      in->mark_caps_dirty(CEPH_CAP_AUTH_EXCL);
     else if (issued & CEPH_CAP_FILE_EXCL)
-      mark_caps_dirty(in, CEPH_CAP_FILE_EXCL);
+      in->mark_caps_dirty(CEPH_CAP_FILE_EXCL);
     else if (issued & CEPH_CAP_XATTR_EXCL)
-      mark_caps_dirty(in, CEPH_CAP_XATTR_EXCL);
+      in->mark_caps_dirty(CEPH_CAP_XATTR_EXCL);
     else
       mask |= CEPH_SETATTR_CTIME;
   }
@@ -6739,7 +6685,7 @@ int Client::_do_setattr(Inode *in, struct ceph_statx *stx, int mask,
       in->cap_dirtier_uid = perms.uid();
       in->cap_dirtier_gid = perms.gid();
       in->uid = stx->stx_uid;
-      mark_caps_dirty(in, CEPH_CAP_AUTH_EXCL);
+      in->mark_caps_dirty(CEPH_CAP_AUTH_EXCL);
       mask &= ~CEPH_SETATTR_UID;
       kill_sguid = true;
       ldout(cct,10) << "changing uid to " << stx->stx_uid << dendl;
@@ -6749,7 +6695,7 @@ int Client::_do_setattr(Inode *in, struct ceph_statx *stx, int mask,
       in->cap_dirtier_uid = perms.uid();
       in->cap_dirtier_gid = perms.gid();
       in->gid = stx->stx_gid;
-      mark_caps_dirty(in, CEPH_CAP_AUTH_EXCL);
+      in->mark_caps_dirty(CEPH_CAP_AUTH_EXCL);
       mask &= ~CEPH_SETATTR_GID;
       kill_sguid = true;
       ldout(cct,10) << "changing gid to " << stx->stx_gid << dendl;
@@ -6760,13 +6706,13 @@ int Client::_do_setattr(Inode *in, struct ceph_statx *stx, int mask,
       in->cap_dirtier_uid = perms.uid();
       in->cap_dirtier_gid = perms.gid();
       in->mode = (in->mode & ~07777) | (stx->stx_mode & 07777);
-      mark_caps_dirty(in, CEPH_CAP_AUTH_EXCL);
+      in->mark_caps_dirty(CEPH_CAP_AUTH_EXCL);
       mask &= ~CEPH_SETATTR_MODE;
       ldout(cct,10) << "changing mode to " << stx->stx_mode << dendl;
     } else if (kill_sguid && S_ISREG(in->mode) && (in->mode & (S_IXUSR|S_IXGRP|S_IXOTH))) {
       /* Must squash the any setuid/setgid bits with an ownership change */
       in->mode &= ~(S_ISUID|S_ISGID);
-      mark_caps_dirty(in, CEPH_CAP_AUTH_EXCL);
+      in->mark_caps_dirty(CEPH_CAP_AUTH_EXCL);
     }
 
     if (mask & CEPH_SETATTR_BTIME) {
@@ -6774,7 +6720,7 @@ int Client::_do_setattr(Inode *in, struct ceph_statx *stx, int mask,
       in->cap_dirtier_uid = perms.uid();
       in->cap_dirtier_gid = perms.gid();
       in->btime = utime_t(stx->stx_btime);
-      mark_caps_dirty(in, CEPH_CAP_AUTH_EXCL);
+      in->mark_caps_dirty(CEPH_CAP_AUTH_EXCL);
       mask &= ~CEPH_SETATTR_BTIME;
       ldout(cct,10) << "changing btime to " << in->btime << dendl;
     }
@@ -6793,7 +6739,7 @@ int Client::_do_setattr(Inode *in, struct ceph_statx *stx, int mask,
       in->cap_dirtier_uid = perms.uid();
       in->cap_dirtier_gid = perms.gid();
       in->time_warp_seq++;
-      mark_caps_dirty(in, CEPH_CAP_FILE_EXCL);
+      in->mark_caps_dirty(CEPH_CAP_FILE_EXCL);
       mask &= ~(CEPH_SETATTR_MTIME|CEPH_SETATTR_ATIME);
     }
   }
@@ -7117,7 +7063,22 @@ int Client::fill_stat(Inode *in, struct stat *st, frag_info_t *dirstat, nest_inf
   st->st_dev = in->snapid;
   st->st_mode = in->mode;
   st->st_rdev = in->rdev;
-  st->st_nlink = in->nlink;
+  if (in->is_dir()) {
+    switch (in->nlink) {
+      case 0:
+        st->st_nlink = 0; /* dir is unlinked */
+        break;
+      case 1:
+        st->st_nlink = 1 /* parent dentry */
+                       + 1 /* <dir>/. */
+                       + in->dirstat.nsubdirs; /* include <dir>/. self-reference */
+        break;
+      default:
+        ceph_abort();
+    }
+  } else {
+    st->st_nlink = in->nlink;
+  }
   st->st_uid = in->uid;
   st->st_gid = in->gid;
   if (in->ctime > in->mtime) {
@@ -7184,7 +7145,22 @@ void Client::fill_statx(Inode *in, unsigned int mask, struct ceph_statx *stx)
   }
 
   if (mask & CEPH_CAP_LINK_SHARED) {
-    stx->stx_nlink = in->nlink;
+    if (in->is_dir()) {
+      switch (in->nlink) {
+        case 0:
+          stx->stx_nlink = 0; /* dir is unlinked */
+          break;
+        case 1:
+          stx->stx_nlink = 1 /* parent dentry */
+                           + 1 /* <dir>/. */
+                           + in->dirstat.nsubdirs; /* include <dir>/. self-reference */
+          break;
+        default:
+          ceph_abort();
+      }
+    } else {
+      stx->stx_nlink = in->nlink;
+    }
     stx->stx_mask |= CEPH_STATX_NLINK;
   }
 
@@ -7755,6 +7731,7 @@ int Client::_readdir_cache_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p,
     else
       dirp->next_offset = dirp->offset_low();
     dirp->last_name = dn_name; // we successfully returned this one; update!
+    dirp->release_count = 0; // last_name no longer match cache index
     if (r > 0)
       return r;
   }
@@ -8872,7 +8849,7 @@ done:
     if (uninline_ret >= 0 || uninline_ret == -ECANCELED) {
       in->inline_data.clear();
       in->inline_version = CEPH_INLINE_NONE;
-      mark_caps_dirty(in, CEPH_CAP_FILE_WR);
+      in->mark_caps_dirty(CEPH_CAP_FILE_WR);
       check_caps(in, 0);
     } else
       r = uninline_ret;
@@ -9156,8 +9133,9 @@ int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf,
 
   // check quota
   uint64_t endoff = offset + size;
-  if (endoff > in->size && is_quota_bytes_exceeded(in, endoff - in->size,
-                                                  f->actor_perms)) {
+  std::list<InodeRef> quota_roots;
+  if (endoff > in->size &&
+      is_quota_bytes_exceeded(in, endoff - in->size, f->actor_perms, &quota_roots)) {
     return -EDQUOT;
   }
 
@@ -9332,9 +9310,9 @@ success:
   // extend file?
   if (totalwritten + offset > in->size) {
     in->size = totalwritten + offset;
-    mark_caps_dirty(in, CEPH_CAP_FILE_WR);
+    in->mark_caps_dirty(CEPH_CAP_FILE_WR);
 
-    if (is_quota_bytes_approaching(in, f->actor_perms)) {
+    if (is_quota_bytes_approaching(in, quota_roots)) {
       check_caps(in, CHECK_CAPS_NODELAY);
     } else if (is_max_size_approaching(in)) {
       check_caps(in, 0);
@@ -9348,7 +9326,7 @@ success:
   // mtime
   in->mtime = ceph_clock_now();
   in->change_attr++;
-  mark_caps_dirty(in, CEPH_CAP_FILE_WR);
+  in->mark_caps_dirty(CEPH_CAP_FILE_WR);
 
 done:
 
@@ -9363,7 +9341,7 @@ done:
     if (uninline_ret >= 0 || uninline_ret == -ECANCELED) {
       in->inline_data.clear();
       in->inline_version = CEPH_INLINE_NONE;
-      mark_caps_dirty(in, CEPH_CAP_FILE_WR);
+      in->mark_caps_dirty(CEPH_CAP_FILE_WR);
       check_caps(in, 0);
     } else
       r = uninline_ret;
@@ -9478,6 +9456,8 @@ int Client::_fsync(Inode *in, bool syncdataonly)
   } else ldout(cct, 10) << "no metadata needs to commit" << dendl;
 
   if (!syncdataonly && !in->unsafe_ops.empty()) {
+    flush_mdlog_sync();
+
     MetaRequest *req = in->unsafe_ops.back();
     ldout(cct, 15) << "waiting on unsafe requests, last tid " << req->get_tid() <<  dendl;
 
@@ -10058,7 +10038,6 @@ void Client::ll_register_callbacks(struct client_callback_args *args)
   ldout(cct, 10) << "ll_register_callbacks cb " << args->handle
                 << " invalidate_ino_cb " << args->ino_cb
                 << " invalidate_dentry_cb " << args->dentry_cb
-                << " getgroups_cb" << args->getgroups_cb
                 << " switch_interrupt_cb " << args->switch_intr_cb
                 << " remount_cb " << args->remount_cb
                 << dendl;
@@ -10079,7 +10058,6 @@ void Client::ll_register_callbacks(struct client_callback_args *args)
     remount_cb = args->remount_cb;
     remount_finisher.start();
   }
-  getgroups_cb = args->getgroups_cb;
   umask_cb = args->umask_cb;
 }
 
@@ -10852,7 +10830,11 @@ int Client::_getxattr(Inode *in, const char *name, void *value, size_t size,
 
     // Do a force getattr to get the latest quota before returning
     // a value to userspace.
-    r = _getattr(in, 0, perms, true);
+    int flags = 0;
+    if (vxattr->flags & VXATTR_RSTAT) {
+      flags |= CEPH_STAT_RSTAT;
+    }
+    r = _getattr(in, flags, perms, true);
     if (r != 0) {
       // Error from getattr!
       return r;
@@ -11369,6 +11351,16 @@ size_t Client::_vxattrcb_dir_rctime(Inode *in, char *val, size_t size)
   readonly: true,                                              \
   hidden: false,                                               \
   exists_cb: NULL,                                             \
+  flags: 0,                                                     \
+}
+#define XATTR_NAME_CEPH2(_type, _name, _flags)                 \
+{                                                              \
+  name: CEPH_XATTR_NAME(_type, _name),                         \
+  getxattr_cb: &Client::_vxattrcb_ ## _type ## _ ## _name,     \
+  readonly: true,                                              \
+  hidden: false,                                               \
+  exists_cb: NULL,                                             \
+  flags: _flags,                                               \
 }
 #define XATTR_LAYOUT_FIELD(_type, _name, _field)               \
 {                                                              \
@@ -11377,6 +11369,7 @@ size_t Client::_vxattrcb_dir_rctime(Inode *in, char *val, size_t size)
   readonly: false,                                             \
   hidden: true,                                                        \
   exists_cb: &Client::_vxattrcb_layout_exists,                 \
+  flags: 0,                                                     \
 }
 #define XATTR_QUOTA_FIELD(_type, _name)                                \
 {                                                              \
@@ -11385,6 +11378,7 @@ size_t Client::_vxattrcb_dir_rctime(Inode *in, char *val, size_t size)
   readonly: false,                                             \
   hidden: true,                                                        \
   exists_cb: &Client::_vxattrcb_quota_exists,                  \
+  flags: 0,                                                     \
 }
 
 const Client::VXattr Client::_dir_vxattrs[] = {
@@ -11394,6 +11388,7 @@ const Client::VXattr Client::_dir_vxattrs[] = {
     readonly: false,
     hidden: true,
     exists_cb: &Client::_vxattrcb_layout_exists,
+    flags: 0,
   },
   XATTR_LAYOUT_FIELD(dir, layout, stripe_unit),
   XATTR_LAYOUT_FIELD(dir, layout, stripe_count),
@@ -11403,17 +11398,18 @@ const Client::VXattr Client::_dir_vxattrs[] = {
   XATTR_NAME_CEPH(dir, entries),
   XATTR_NAME_CEPH(dir, files),
   XATTR_NAME_CEPH(dir, subdirs),
-  XATTR_NAME_CEPH(dir, rentries),
-  XATTR_NAME_CEPH(dir, rfiles),
-  XATTR_NAME_CEPH(dir, rsubdirs),
-  XATTR_NAME_CEPH(dir, rbytes),
-  XATTR_NAME_CEPH(dir, rctime),
+  XATTR_NAME_CEPH2(dir, rentries, VXATTR_RSTAT),
+  XATTR_NAME_CEPH2(dir, rfiles, VXATTR_RSTAT),
+  XATTR_NAME_CEPH2(dir, rsubdirs, VXATTR_RSTAT),
+  XATTR_NAME_CEPH2(dir, rbytes, VXATTR_RSTAT),
+  XATTR_NAME_CEPH2(dir, rctime, VXATTR_RSTAT),
   {
     name: "ceph.quota",
     getxattr_cb: &Client::_vxattrcb_quota,
     readonly: false,
     hidden: true,
     exists_cb: &Client::_vxattrcb_quota_exists,
+    flags: 0,
   },
   XATTR_QUOTA_FIELD(quota, max_bytes),
   XATTR_QUOTA_FIELD(quota, max_files),
@@ -11427,6 +11423,7 @@ const Client::VXattr Client::_file_vxattrs[] = {
     readonly: false,
     hidden: true,
     exists_cb: &Client::_vxattrcb_layout_exists,
+    flags: 0,
   },
   XATTR_LAYOUT_FIELD(file, layout, stripe_unit),
   XATTR_LAYOUT_FIELD(file, layout, stripe_count),
@@ -12395,7 +12392,7 @@ int Client::ll_get_stripe_osd(Inode *in, uint64_t blockno,
 {
   Mutex::Locker lock(client_lock);
 
-  inodeno_t ino = ll_get_inodeno(in);
+  inodeno_t ino = in->ino;
   uint32_t object_size = layout->object_size;
   uint32_t su = layout->stripe_unit;
   uint32_t stripe_count = layout->stripe_count;
@@ -12879,6 +12876,19 @@ int Client::ll_fsync(Fh *fh, bool syncdataonly)
   return r;
 }
 
+int Client::ll_sync_inode(Inode *in, bool syncdataonly)
+{
+  Mutex::Locker lock(client_lock);
+  ldout(cct, 3) << "ll_sync_inode " << *in << " " << dendl;
+  tout(cct) << "ll_sync_inode" << std::endl;
+  tout(cct) << (unsigned long)in << std::endl;
+
+  if (unmounting)
+    return -ENOTCONN;
+
+  return _fsync(in, syncdataonly);
+}
+
 #ifdef FALLOC_FL_PUNCH_HOLE
 
 int Client::_fallocate(Fh *fh, int mode, int64_t offset, int64_t length)
@@ -12906,9 +12916,10 @@ int Client::_fallocate(Fh *fh, int mode, int64_t offset, int64_t length)
     return -EBADF;
 
   uint64_t size = offset + length;
+  std::list<InodeRef> quota_roots;
   if (!(mode & (FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE)) &&
       size > in->size &&
-      is_quota_bytes_exceeded(in, size - in->size, fh->actor_perms)) {
+      is_quota_bytes_exceeded(in, size - in->size, fh->actor_perms, &quota_roots)) {
     return -EDQUOT;
   }
 
@@ -12943,7 +12954,7 @@ int Client::_fallocate(Fh *fh, int mode, int64_t offset, int64_t length)
       }
       in->mtime = ceph_clock_now();
       in->change_attr++;
-      mark_caps_dirty(in, CEPH_CAP_FILE_WR);
+      in->mark_caps_dirty(CEPH_CAP_FILE_WR);
     } else {
       if (in->inline_version < CEPH_INLINE_NONE) {
         onuninline = new C_SafeCond(&uninline_flock,
@@ -12969,7 +12980,7 @@ int Client::_fallocate(Fh *fh, int mode, int64_t offset, int64_t length)
                  0, true, onfinish);
       in->mtime = ceph_clock_now();
       in->change_attr++;
-      mark_caps_dirty(in, CEPH_CAP_FILE_WR);
+      in->mark_caps_dirty(CEPH_CAP_FILE_WR);
 
       client_lock.Unlock();
       flock.Lock();
@@ -12985,9 +12996,9 @@ int Client::_fallocate(Fh *fh, int mode, int64_t offset, int64_t length)
       in->size = size;
       in->mtime = ceph_clock_now();
       in->change_attr++;
-      mark_caps_dirty(in, CEPH_CAP_FILE_WR);
+      in->mark_caps_dirty(CEPH_CAP_FILE_WR);
 
-      if (is_quota_bytes_approaching(in, fh->actor_perms)) {
+      if (is_quota_bytes_approaching(in, quota_roots)) {
         check_caps(in, CHECK_CAPS_NODELAY);
       } else if (is_max_size_approaching(in)) {
        check_caps(in, 0);
@@ -13006,7 +13017,7 @@ int Client::_fallocate(Fh *fh, int mode, int64_t offset, int64_t length)
     if (uninline_ret >= 0 || uninline_ret == -ECANCELED) {
       in->inline_data.clear();
       in->inline_version = CEPH_INLINE_NONE;
-      mark_caps_dirty(in, CEPH_CAP_FILE_WR);
+      in->mark_caps_dirty(CEPH_CAP_FILE_WR);
       check_caps(in, 0);
     } else
       r = uninline_ret;
@@ -13473,6 +13484,7 @@ void Client::ms_handle_remote_reset(Connection *con)
 
        case MetaSession::STATE_OPEN:
          {
+           objecter->maybe_request_map(); /* to check if we are blacklisted */
            const md_config_t *conf = cct->_conf;
            if (conf->client_reconnect_stale) {
              ldout(cct, 1) << "reset from mds we were open; close mds session for reconnect" << dendl;
@@ -13615,32 +13627,34 @@ bool Client::is_quota_files_exceeded(Inode *in, const UserPerm& perms)
 }
 
 bool Client::is_quota_bytes_exceeded(Inode *in, int64_t new_bytes,
-                                    const UserPerm& perms)
+                                    const UserPerm& perms,
+                                    std::list<InodeRef>* quota_roots)
 {
   return check_quota_condition(in, perms,
-      [&new_bytes](const Inode &in) {
+      [&new_bytes, quota_roots](const Inode &in) {
+       if (quota_roots)
+         quota_roots->emplace_back(const_cast<Inode*>(&in));
         return in.quota.max_bytes && (in.rstat.rbytes + new_bytes)
                > in.quota.max_bytes;
       });
 }
 
-bool Client::is_quota_bytes_approaching(Inode *in, const UserPerm& perms)
+bool Client::is_quota_bytes_approaching(Inode *in, std::list<InodeRef>& quota_roots)
 {
-  return check_quota_condition(in, perms,
-      [](const Inode &in) {
-        if (in.quota.max_bytes) {
-          if (in.rstat.rbytes >= in.quota.max_bytes) {
-            return true;
-          }
-
-          assert(in.size >= in.reported_size);
-          const uint64_t space = in.quota.max_bytes - in.rstat.rbytes;
-          const uint64_t size = in.size - in.reported_size;
-          return (space >> 4) < size;
-        } else {
-          return false;
-        }
-      });
+  assert(in->size >= in->reported_size);
+  const uint64_t size = in->size - in->reported_size;
+
+  for (auto& diri : quota_roots) {
+    if (diri->quota.max_bytes) {
+      if (diri->rstat.rbytes >= diri->quota.max_bytes)
+       return true;
+
+      uint64_t space = diri->quota.max_bytes - diri->rstat.rbytes;
+      if ((space >> 4) < size)
+       return true;
+    }
+  }
+  return false;
 }
 
 enum {
@@ -13895,13 +13909,6 @@ void Client::handle_conf_change(const struct md_config_t *conf,
   }
 }
 
-void Client::init_groups(UserPerm *perms)
-{
-  gid_t *sgids;
-  int count = _getgrouplist(&sgids, perms->uid(), perms->gid());
-  perms->init_gids(sgids, count);
-}
-
 void intrusive_ptr_add_ref(Inode *in)
 {
   in->get();