]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_file.cc
import ceph 16.2.6
[ceph.git] / ceph / src / rgw / rgw_file.cc
index 9795f7e6dcc9d24cf2d2eceb3ecb0ad4e973e4f3..b9b2b3cbd703471fe5d20577a3f8a376137a8d1b 100644 (file)
@@ -1,5 +1,5 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
+// vim: ts=8 sw=2 smarttab ft=cpp
 
 #include "include/compat.h"
 #include "include/rados/rgw_file.h"
@@ -8,7 +8,6 @@
 #include <sys/stat.h>
 
 #include "rgw_lib.h"
-#include "rgw_rados.h"
 #include "rgw_resolve.h"
 #include "rgw_op.h"
 #include "rgw_rest.h"
 #include "rgw_auth_s3.h"
 #include "rgw_user.h"
 #include "rgw_bucket.h"
+#include "rgw_zone.h"
 #include "rgw_file.h"
 #include "rgw_lib_frontend.h"
+#include "rgw_perf_counters.h"
 #include "common/errno.h"
 
+#include "services/svc_zone.h"
+
 #include <atomic>
 
 #define dout_subsys ceph_subsys_rgw
@@ -66,13 +69,40 @@ namespace rgw {
     return 0;
   }
 
+  class XattrHash
+  {
+  public:
+    std::size_t operator()(const rgw_xattrstr& att) const noexcept {
+      return XXH64(att.val, att.len, 5882300);
+    }
+  };
+
+  class XattrEqual
+  {
+  public:
+    bool operator()(const rgw_xattrstr& lhs, const rgw_xattrstr& rhs) const {
+      return ((lhs.len == rhs.len) &&
+             (strncmp(lhs.val, rhs.val, lhs.len) == 0));
+    }
+  };
+
+  /* well-known attributes */
+  static const std::unordered_set<
+    rgw_xattrstr, XattrHash, XattrEqual> rgw_exposed_attrs = {
+    rgw_xattrstr{const_cast<char*>(RGW_ATTR_ETAG), sizeof(RGW_ATTR_ETAG)-1}
+  };
+
+  static inline bool is_exposed_attr(const rgw_xattrstr& k) {
+    return (rgw_exposed_attrs.find(k) != rgw_exposed_attrs.end());
+  }
+
   LookupFHResult RGWLibFS::stat_bucket(RGWFileHandle* parent, const char *path,
                                       RGWLibFS::BucketStats& bs,
                                       uint32_t flags)
   {
     LookupFHResult fhr{nullptr, 0};
     std::string bucket_name{path};
-    RGWStatBucketRequest req(cct, get_user(), bucket_name, bs);
+    RGWStatBucketRequest req(cct, rgwlib.get_store()->get_user(user.user_id), bucket_name, bs);
 
     int rc = rgwlib.get_fe()->execute_req(&req);
     if ((rc == 0) &&
@@ -105,6 +135,42 @@ namespace rgw {
     return fhr;
   }
 
+  LookupFHResult RGWLibFS::fake_leaf(RGWFileHandle* parent,
+                                    const char *path,
+                                    enum rgw_fh_type type,
+                                    struct stat *st, uint32_t st_mask,
+                                    uint32_t flags)
+  {
+    /* synthesize a minimal handle from parent, path, type, and st */
+    using std::get;
+
+    flags |= RGWFileHandle::FLAG_CREATE;
+
+    switch (type) {
+    case RGW_FS_TYPE_DIRECTORY:
+      flags |= RGWFileHandle::FLAG_DIRECTORY;
+      break;
+    default:
+      /* file */
+      break;
+    };
+
+    LookupFHResult fhr = lookup_fh(parent, path, flags);
+    if (get<0>(fhr)) {
+      RGWFileHandle* rgw_fh = get<0>(fhr);
+      if (st) {        
+       lock_guard guard(rgw_fh->mtx);
+       if (st_mask & RGW_SETATTR_SIZE) {
+         rgw_fh->set_size(st->st_size);
+       }
+       if (st_mask & RGW_SETATTR_MTIME) {
+         rgw_fh->set_times(st->st_mtim);
+       }
+      } /* st */
+    } /* rgw_fh */
+    return fhr;
+  } /* RGWLibFS::fake_leaf */
+
   LookupFHResult RGWLibFS::stat_leaf(RGWFileHandle* parent,
                                     const char *path,
                                     enum rgw_fh_type type,
@@ -131,7 +197,7 @@ namespace rgw {
        if (type == RGW_FS_TYPE_DIRECTORY)
          continue;
 
-       RGWStatObjRequest req(cct, get_user(),
+       RGWStatObjRequest req(cct, rgwlib.get_store()->get_user(user.user_id),
                              parent->bucket_name(), obj_path,
                              RGWStatObjRequest::FLAG_NONE);
        int rc = rgwlib.get_fe()->execute_req(&req);
@@ -146,7 +212,10 @@ namespace rgw {
            /* restore attributes */
            auto ux_key = req.get_attr(RGW_ATTR_UNIX_KEY1);
            auto ux_attrs = req.get_attr(RGW_ATTR_UNIX1);
-           if (ux_key && ux_attrs) {
+            rgw_fh->set_etag(*(req.get_attr(RGW_ATTR_ETAG)));
+            rgw_fh->set_acls(*(req.get_attr(RGW_ATTR_ACL)));
+           if (!(flags & RGWFileHandle::FLAG_IN_CB) &&
+               ux_key && ux_attrs) {
               DecodeAttrsResult dar = rgw_fh->decode_attrs(ux_key, ux_attrs);
               if (get<0>(dar) || get<1>(dar)) {
                 update_fh(rgw_fh);
@@ -165,7 +234,7 @@ namespace rgw {
          continue;
 
        obj_path += "/";
-       RGWStatObjRequest req(cct, get_user(),
+       RGWStatObjRequest req(cct, rgwlib.get_store()->get_user(user.user_id),
                              parent->bucket_name(), obj_path,
                              RGWStatObjRequest::FLAG_NONE);
        int rc = rgwlib.get_fe()->execute_req(&req);
@@ -180,7 +249,10 @@ namespace rgw {
            /* restore attributes */
            auto ux_key = req.get_attr(RGW_ATTR_UNIX_KEY1);
            auto ux_attrs = req.get_attr(RGW_ATTR_UNIX1);
-           if (ux_key && ux_attrs) {
+            rgw_fh->set_etag(*(req.get_attr(RGW_ATTR_ETAG)));
+            rgw_fh->set_acls(*(req.get_attr(RGW_ATTR_ACL)));
+           if (!(flags & RGWFileHandle::FLAG_IN_CB) &&
+               ux_key && ux_attrs) {
               DecodeAttrsResult dar = rgw_fh->decode_attrs(ux_key, ux_attrs);
               if (get<0>(dar) || get<1>(dar)) {
                 update_fh(rgw_fh);
@@ -194,7 +266,8 @@ namespace rgw {
       case 2:
       {
        std::string object_name{path};
-       RGWStatLeafRequest req(cct, get_user(), parent, object_name);
+       RGWStatLeafRequest req(cct, rgwlib.get_store()->get_user(user.user_id),
+                              parent, object_name);
        int rc = rgwlib.get_fe()->execute_req(&req);
        if ((rc == 0) &&
            (req.get_ret() == 0)) {
@@ -246,12 +319,35 @@ namespace rgw {
     if (rgw_fh->deleted())
       return -ESTALE;
 
-    RGWReadRequest req(get_context(), get_user(), rgw_fh, offset, length,
-                      buffer);
+    RGWReadRequest req(get_context(), rgwlib.get_store()->get_user(user.user_id),
+                      rgw_fh, offset, length, buffer);
 
     int rc = rgwlib.get_fe()->execute_req(&req);
     if ((rc == 0) &&
-       (req.get_ret() == 0)) {
+        ((rc = req.get_ret()) == 0)) {
+      lock_guard guard(rgw_fh->mtx);
+      rgw_fh->set_atime(real_clock::to_timespec(real_clock::now()));
+      *bytes_read = req.nread;
+    }
+
+    return rc;
+  }
+
+  int RGWLibFS::readlink(RGWFileHandle* rgw_fh, uint64_t offset, size_t length,
+                     size_t* bytes_read, void* buffer, uint32_t flags)
+  {
+    if (! rgw_fh->is_link())
+      return -EINVAL;
+
+    if (rgw_fh->deleted())
+      return -ESTALE;
+
+    RGWReadRequest req(get_context(), rgwlib.get_store()->get_user(user.user_id),
+                      rgw_fh, offset, length, buffer);
+
+    int rc = rgwlib.get_fe()->execute_req(&req);
+    if ((rc == 0) &&
+        ((rc = req.get_ret()) == 0)) {
       lock_guard(rgw_fh->mtx);
       rgw_fh->set_atime(real_clock::to_timespec(real_clock::now()));
       *bytes_read = req.nread;
@@ -304,14 +400,15 @@ namespace rgw {
       } else {
        /* delete object w/key "<bucket>/" (uxattrs), if any */
        string oname{"/"};
-       RGWDeleteObjRequest req(cct, get_user(), bkt_fh->bucket_name(), oname);
+       RGWDeleteObjRequest req(cct, rgwlib.get_store()->get_user(user.user_id),
+                               bkt_fh->bucket_name(), oname);
        rc = rgwlib.get_fe()->execute_req(&req);
        /* don't care if ENOENT */
        unref(bkt_fh);
       }
 
       string bname{name};
-      RGWDeleteBucketRequest req(cct, get_user(), bname);
+      RGWDeleteBucketRequest req(cct, rgwlib.get_store()->get_user(user.user_id), bname);
       rc = rgwlib.get_fe()->execute_req(&req);
       if (! rc) {
        rc = req.get_ret();
@@ -326,6 +423,7 @@ namespace rgw {
         * atomicity at this endpoint */
        struct rgw_file_handle *fh;
        rc = rgw_lookup(get_fs(), parent->get_fh(), name, &fh,
+                       nullptr /* st */, 0 /* mask */,
                        RGW_LOOKUP_FLAG_NONE);
        if (!! rc)
          return rc;
@@ -346,7 +444,7 @@ namespace rgw {
        }
        oname += "/";
       }
-      RGWDeleteObjRequest req(cct, get_user(), parent->bucket_name(),
+      RGWDeleteObjRequest req(cct, rgwlib.get_store()->get_user(user.user_id), parent->bucket_name(),
                              oname);
       rc = rgwlib.get_fe()->execute_req(&req);
       if (! rc) {
@@ -380,7 +478,6 @@ namespace rgw {
     /* XXX initial implementation: try-copy, and delete if copy
      * succeeds */
     int rc = -EINVAL;
-
     real_time t;
 
     std::string src_name{_src_name};
@@ -425,8 +522,8 @@ namespace rgw {
       switch (ix) {
       case 0:
       {
-       RGWCopyObjRequest req(cct, get_user(), src_fh, dst_fh, src_name,
-                             dst_name);
+       RGWCopyObjRequest req(cct, rgwlib.get_store()->get_user(user.user_id),
+                             src_fh, dst_fh, src_name, dst_name);
        int rc = rgwlib.get_fe()->execute_req(&req);
        if ((rc != 0) ||
            ((rc = req.get_ret()) != 0)) {
@@ -481,7 +578,7 @@ namespace rgw {
       }
       goto out;
       default:
-       abort();
+       ceph_abort();
       } /* switch */
     } /* ix */
   unlock:
@@ -499,10 +596,12 @@ namespace rgw {
     rgw_file_handle *lfh;
 
     rc = rgw_lookup(get_fs(), parent->get_fh(), name, &lfh,
+                   nullptr /* st */, 0 /* mask */,
                    RGW_LOOKUP_FLAG_NONE);
     if (! rc) {
       /* conflict! */
       rc = rgw_fh_rele(get_fs(), lfh, RGW_FH_RELE_FLAG_NONE);
+      // ignore return code
       return MkObjResult{nullptr, -EEXIST};
     }
 
@@ -522,7 +621,7 @@ namespace rgw {
       /* save attrs */
       rgw_fh->encode_attrs(ux_key, ux_attrs);
       if (st)
-        rgw_fh->stat(st);
+        rgw_fh->stat(st, RGWFileHandle::FLAG_LOCKED);
       get<0>(mkr) = rgw_fh;
     } else {
       get<1>(mkr) = -EIO;
@@ -545,7 +644,8 @@ namespace rgw {
        return mkr;
       }
 
-      RGWCreateBucketRequest req(get_context(), get_user(), bname);
+      RGWCreateBucketRequest req(get_context(),
+                                rgwlib.get_store()->get_user(user.user_id), bname);
 
       /* save attrs */
       req.emplace_attr(RGW_ATTR_UNIX_KEY1, std::move(ux_key));
@@ -571,8 +671,8 @@ namespace rgw {
        return mkr;
       }
 
-      RGWPutObjRequest req(get_context(), get_user(), parent->bucket_name(),
-                         dir_name, bl);
+      RGWPutObjRequest req(get_context(), rgwlib.get_store()->get_user(user.user_id),
+                          parent->bucket_name(), dir_name, bl);
 
       /* save attrs */
       req.emplace_attr(RGW_ATTR_UNIX_KEY1, std::move(ux_key));
@@ -613,10 +713,12 @@ namespace rgw {
 
     rgw_file_handle *lfh;
     rc = rgw_lookup(get_fs(), parent->get_fh(), name, &lfh,
+                   nullptr /* st */, 0 /* mask */,
                    RGW_LOOKUP_FLAG_NONE);
     if (! rc) {
       /* conflict! */
       rc = rgw_fh_rele(get_fs(), lfh, RGW_FH_RELE_FLAG_NONE);
+      // ignore return code
       return MkObjResult{nullptr, -EEXIST};
     }
 
@@ -629,7 +731,8 @@ namespace rgw {
 
     /* create it */
     buffer::list bl;
-    RGWPutObjRequest req(cct, get_user(), parent->bucket_name(), obj_name, bl);
+    RGWPutObjRequest req(cct, rgwlib.get_store()->get_user(user.user_id),
+                        parent->bucket_name(), obj_name, bl);
     MkObjResult mkr{nullptr, -EINVAL};
 
     rc = rgwlib.get_fe()->execute_req(&req);
@@ -652,7 +755,11 @@ namespace rgw {
          parent->set_ctime(real_clock::to_timespec(t));
        }
         if (st)
-          (void) rgw_fh->stat(st);
+         (void) rgw_fh->stat(st, RGWFileHandle::FLAG_LOCKED);
+
+        rgw_fh->set_etag(*(req.get_attr(RGW_ATTR_ETAG)));
+        rgw_fh->set_acls(*(req.get_attr(RGW_ATTR_ACL))); 
+
        get<0>(mkr) = rgw_fh;
        rgw_fh->mtx.unlock();
       } else
@@ -660,10 +767,113 @@ namespace rgw {
     }
 
     get<1>(mkr) = rc;
+   
+    /* case like : quota exceed will be considered as fail too*/
+    if(rc2 < 0)
+       get<1>(mkr) = rc2;
 
     return mkr;
   } /* RGWLibFS::create */
 
+  MkObjResult RGWLibFS::symlink(RGWFileHandle* parent, const char *name,
+            const char* link_path, struct stat *st, uint32_t mask, uint32_t flags)
+  {
+    int rc, rc2;
+
+    using std::get;
+
+    rgw_file_handle *lfh;
+    rc = rgw_lookup(get_fs(), parent->get_fh(), name, &lfh,
+                   nullptr /* st */, 0 /* mask */,
+                    RGW_LOOKUP_FLAG_NONE);
+    if (! rc) {
+      /* conflict! */
+      rc = rgw_fh_rele(get_fs(), lfh, RGW_FH_RELE_FLAG_NONE);
+      // ignore return code
+      return MkObjResult{nullptr, -EEXIST};
+    }
+
+    MkObjResult mkr{nullptr, -EINVAL};
+    LookupFHResult fhr;
+    RGWFileHandle* rgw_fh = nullptr;
+    buffer::list ux_key, ux_attrs;
+
+    fhr = lookup_fh(parent, name,
+      RGWFileHandle::FLAG_CREATE|
+      RGWFileHandle::FLAG_SYMBOLIC_LINK|
+      RGWFileHandle::FLAG_LOCK);
+    rgw_fh = get<0>(fhr);
+    if (rgw_fh) {
+      rgw_fh->create_stat(st, mask);
+      rgw_fh->set_times(real_clock::now());
+      /* save attrs */
+      rgw_fh->encode_attrs(ux_key, ux_attrs);
+      if (st)
+        rgw_fh->stat(st);
+      get<0>(mkr) = rgw_fh;
+    } else {
+      get<1>(mkr) = -EIO;
+      return mkr;
+    }
+
+    /* need valid S3 name (characters, length <= 1024, etc) */
+    rc = valid_fs_object_name(name);
+    if (rc != 0) {
+      rgw_fh->flags |= RGWFileHandle::FLAG_DELETED;
+      fh_cache.remove(rgw_fh->fh.fh_hk.object, rgw_fh,
+        RGWFileHandle::FHCache::FLAG_LOCK);
+      rgw_fh->mtx.unlock();
+      unref(rgw_fh);
+      get<0>(mkr) = nullptr;
+      get<1>(mkr) = rc;
+      return mkr;
+    }
+
+    string obj_name = std::string(name);
+    /* create an object representing the directory */
+    buffer::list bl;
+
+    /* XXXX */
+#if 0
+    bl.push_back(
+      buffer::create_static(len, static_cast<char*>(buffer)));
+#else
+
+    bl.push_back(
+      buffer::copy(link_path, strlen(link_path)));
+#endif
+
+    RGWPutObjRequest req(get_context(), rgwlib.get_store()->get_user(user.user_id),
+                        parent->bucket_name(), obj_name, bl);
+
+    /* save attrs */
+    req.emplace_attr(RGW_ATTR_UNIX_KEY1, std::move(ux_key));
+    req.emplace_attr(RGW_ATTR_UNIX1, std::move(ux_attrs));
+
+    rc = rgwlib.get_fe()->execute_req(&req);
+    rc2 = req.get_ret();
+    if (! ((rc == 0) &&
+        (rc2 == 0))) {
+      /* op failed */
+      rgw_fh->flags |= RGWFileHandle::FLAG_DELETED;
+      rgw_fh->mtx.unlock(); /* !LOCKED */
+      unref(rgw_fh);
+      get<0>(mkr) = nullptr;
+      /* fixup rc */
+      if (!rc)
+        rc = rc2;
+    } else {
+      real_time t = real_clock::now();
+      parent->set_mtime(real_clock::to_timespec(t));
+      parent->set_ctime(real_clock::to_timespec(t));
+      rgw_fh->mtx.unlock(); /* !LOCKED */
+    }
+
+    get<1>(mkr) = rc;
+
+    return mkr;
+  } /* RGWLibFS::symlink */
+
   int RGWLibFS::getattr(RGWFileHandle* rgw_fh, struct stat* st)
   {
     switch(rgw_fh->fh.fh_type) {
@@ -676,7 +886,7 @@ namespace rgw {
     default:
       break;
     };
-
+    /* if rgw_fh is a directory, mtime will be advanced */
     return rgw_fh->stat(st);
   } /* RGWLibFS::getattr */
 
@@ -685,6 +895,8 @@ namespace rgw {
   {
     int rc, rc2;
     buffer::list ux_key, ux_attrs;
+    buffer::list etag = rgw_fh->get_etag();
+    buffer::list acls = rgw_fh->get_acls();
 
     lock_guard guard(rgw_fh->mtx);
 
@@ -706,7 +918,8 @@ namespace rgw {
       obj_name += "/";
     }
 
-    RGWSetAttrsRequest req(cct, get_user(), rgw_fh->bucket_name(), obj_name);
+    RGWSetAttrsRequest req(cct, rgwlib.get_store()->get_user(user.user_id),
+                          rgw_fh->bucket_name(), obj_name);
 
     rgw_fh->create_stat(st, mask);
     rgw_fh->encode_attrs(ux_key, ux_attrs);
@@ -714,6 +927,8 @@ namespace rgw {
     /* save attrs */
     req.emplace_attr(RGW_ATTR_UNIX_KEY1, std::move(ux_key));
     req.emplace_attr(RGW_ATTR_UNIX1, std::move(ux_attrs));
+    req.emplace_attr(RGW_ATTR_ETAG, std::move(etag));
+    req.emplace_attr(RGW_ATTR_ACL, std::move(acls));
 
     rc = rgwlib.get_fe()->execute_req(&req);
     rc2 = req.get_ret();
@@ -721,7 +936,7 @@ namespace rgw {
     if (rc == -ENOENT) {
       /* special case:  materialize placeholder dir */
       buffer::list bl;
-      RGWPutObjRequest req(get_context(), get_user(), rgw_fh->bucket_name(),
+      RGWPutObjRequest req(get_context(), rgwlib.get_store()->get_user(user.user_id), rgw_fh->bucket_name(),
                           obj_name, bl);
 
       rgw_fh->encode_attrs(ux_key, ux_attrs); /* because std::moved */
@@ -743,7 +958,242 @@ namespace rgw {
     return 0;
   } /* RGWLibFS::setattr */
 
-  /* called under rgw_fh->mtx held */
+  static inline std::string prefix_xattr_keystr(const rgw_xattrstr& key) {
+    std::string keystr;
+    keystr.reserve(sizeof(RGW_ATTR_META_PREFIX) + key.len);
+    keystr += string{RGW_ATTR_META_PREFIX};
+    keystr += string{key.val, key.len};
+    return keystr;
+  }
+
+  static inline std::string_view unprefix_xattr_keystr(const std::string& key)
+  {
+    std::string_view svk{key};
+    auto pos = svk.find(RGW_ATTR_META_PREFIX);
+    if (pos == std::string_view::npos) {
+      return std::string_view{""};
+    } else if (pos == 0) {
+      svk.remove_prefix(sizeof(RGW_ATTR_META_PREFIX)-1);
+    }
+    return svk;
+  }
+
+  int RGWLibFS::getxattrs(RGWFileHandle* rgw_fh, rgw_xattrlist *attrs,
+                         rgw_getxattr_cb cb, void *cb_arg,
+                         uint32_t flags)
+  {
+    /* cannot store on fs_root, should not on buckets? */
+    if ((rgw_fh->is_bucket()) ||
+       (rgw_fh->is_root()))  {
+      return -EINVAL;
+    }
+
+    int rc, rc2, rc3;
+    string obj_name{rgw_fh->relative_object_name2()};
+
+    RGWGetAttrsRequest req(cct, rgwlib.get_store()->get_user(user.user_id),
+                          rgw_fh->bucket_name(), obj_name);
+
+    for (uint32_t ix = 0; ix < attrs->xattr_cnt; ++ix) {
+      auto& xattr = attrs->xattrs[ix];
+
+      /* pass exposed attr keys as given, else prefix */
+      std::string k = is_exposed_attr(xattr.key)
+       ? std::string{xattr.key.val, xattr.key.len}
+       : prefix_xattr_keystr(xattr.key);
+
+      req.emplace_key(std::move(k));
+    }
+
+    if (ldlog_p1(get_context(), ceph_subsys_rgw, 15)) {
+      lsubdout(get_context(), rgw, 15)
+       << __func__
+       << " get keys for: "
+       << rgw_fh->object_name()
+       << " keys:"
+       << dendl;
+      for (const auto& attr: req.get_attrs()) {
+       lsubdout(get_context(), rgw, 15)
+         << "\tkey: " << attr.first << dendl;
+      }
+    }
+
+    rc = rgwlib.get_fe()->execute_req(&req);
+    rc2 = req.get_ret();
+    rc3 = ((rc == 0) && (rc2 == 0)) ? 0 : -EIO;
+
+    /* call back w/xattr data */
+    if (rc3 == 0) {
+      const auto& attrs = req.get_attrs();
+      for (const auto& attr : attrs) {
+
+       if (!attr.second.has_value())
+         continue;
+
+       const auto& k = attr.first;
+       const auto& v = attr.second.value();
+
+       /* return exposed attr keys as given, else unprefix --
+        * yes, we could have memoized the exposed check, but
+        * to be efficient it would need to be saved with
+        * RGWGetAttrs::attrs, I think */
+       std::string_view svk =
+         is_exposed_attr(rgw_xattrstr{const_cast<char*>(k.c_str()),
+                                      uint32_t(k.length())})
+         ? k
+         : unprefix_xattr_keystr(k);
+
+       /* skip entries not matching prefix */
+       if (svk.empty())
+         continue;
+
+       rgw_xattrstr xattr_k = { const_cast<char*>(svk.data()),
+                                uint32_t(svk.length())};
+       rgw_xattrstr xattr_v =
+         {const_cast<char*>(const_cast<buffer::list&>(v).c_str()),
+          uint32_t(v.length())};
+       rgw_xattr xattr = { xattr_k, xattr_v };
+       rgw_xattrlist xattrlist = { &xattr, 1 };
+
+       cb(&xattrlist, cb_arg, RGW_GETXATTR_FLAG_NONE);
+      }
+    }
+
+    return rc3;
+  } /* RGWLibFS::getxattrs */
+
+  int RGWLibFS::lsxattrs(
+    RGWFileHandle* rgw_fh, rgw_xattrstr *filter_prefix, rgw_getxattr_cb cb,
+    void *cb_arg, uint32_t flags)
+  {
+    /* cannot store on fs_root, should not on buckets? */
+    if ((rgw_fh->is_bucket()) ||
+       (rgw_fh->is_root()))  {
+      return -EINVAL;
+    }
+
+    int rc, rc2, rc3;
+    string obj_name{rgw_fh->relative_object_name2()};
+
+    RGWGetAttrsRequest req(cct, rgwlib.get_store()->get_user(user.user_id),
+                          rgw_fh->bucket_name(), obj_name);
+
+    rc = rgwlib.get_fe()->execute_req(&req);
+    rc2 = req.get_ret();
+    rc3 = ((rc == 0) && (rc2 == 0)) ? 0 : -EIO;
+
+    /* call back w/xattr data--check for eof */
+    if (rc3 == 0) {
+      const auto& keys = req.get_attrs();
+      for (const auto& k : keys) {
+
+       /* return exposed attr keys as given, else unprefix */
+       std::string_view svk =
+         is_exposed_attr(rgw_xattrstr{const_cast<char*>(k.first.c_str()),
+                                      uint32_t(k.first.length())})
+         ? k.first
+         : unprefix_xattr_keystr(k.first);
+
+       /* skip entries not matching prefix */
+       if (svk.empty())
+         continue;
+
+       rgw_xattrstr xattr_k = { const_cast<char*>(svk.data()),
+                                uint32_t(svk.length())};
+       rgw_xattrstr xattr_v = { nullptr, 0 };
+       rgw_xattr xattr = { xattr_k, xattr_v };
+       rgw_xattrlist xattrlist = { &xattr, 1 };
+
+       auto cbr = cb(&xattrlist, cb_arg, RGW_LSXATTR_FLAG_NONE);
+       if (cbr & RGW_LSXATTR_FLAG_STOP)
+         break;
+      }
+    }
+
+    return rc3;
+  } /* RGWLibFS::lsxattrs */
+
+  int RGWLibFS::setxattrs(RGWFileHandle* rgw_fh, rgw_xattrlist *attrs,
+                         uint32_t flags)
+  {
+    /* cannot store on fs_root, should not on buckets? */
+    if ((rgw_fh->is_bucket()) ||
+       (rgw_fh->is_root()))  {
+      return -EINVAL;
+    }
+
+    int rc, rc2;
+    string obj_name{rgw_fh->relative_object_name2()};
+
+    RGWSetAttrsRequest req(cct, rgwlib.get_store()->get_user(user.user_id),
+                          rgw_fh->bucket_name(), obj_name);
+
+    for (uint32_t ix = 0; ix < attrs->xattr_cnt; ++ix) {
+      auto& xattr = attrs->xattrs[ix];
+      buffer::list attr_bl;
+      /* don't allow storing at RGW_ATTR_META_PREFIX */
+      if (! (xattr.key.len > 0))
+       continue;
+
+      /* reject lexical match with any exposed attr */
+      if (is_exposed_attr(xattr.key))
+       continue;
+
+      string k = prefix_xattr_keystr(xattr.key);
+      attr_bl.append(xattr.val.val, xattr.val.len);
+      req.emplace_attr(k.c_str(), std::move(attr_bl));
+    }
+
+    /* don't send null requests */
+    if (! (req.get_attrs().size() > 0)) {
+      return -EINVAL;
+    }
+
+    rc = rgwlib.get_fe()->execute_req(&req);
+    rc2 = req.get_ret();
+
+    return (((rc == 0) && (rc2 == 0)) ? 0 : -EIO);
+
+  } /* RGWLibFS::setxattrs */
+
+  int RGWLibFS::rmxattrs(RGWFileHandle* rgw_fh, rgw_xattrlist* attrs,
+                        uint32_t flags)
+  {
+    /* cannot store on fs_root, should not on buckets? */
+    if ((rgw_fh->is_bucket()) ||
+       (rgw_fh->is_root()))  {
+      return -EINVAL;
+    }
+
+    int rc, rc2;
+    string obj_name{rgw_fh->relative_object_name2()};
+
+    RGWRMAttrsRequest req(cct, rgwlib.get_store()->get_user(user.user_id),
+                         rgw_fh->bucket_name(), obj_name);
+
+    for (uint32_t ix = 0; ix < attrs->xattr_cnt; ++ix) {
+      auto& xattr = attrs->xattrs[ix];
+      /* don't allow storing at RGW_ATTR_META_PREFIX */
+      if (! (xattr.key.len > 0)) {
+       continue;
+      }
+      string k = prefix_xattr_keystr(xattr.key);
+      req.emplace_key(std::move(k));
+    }
+
+    /* don't send null requests */
+    if (! (req.get_attrs().size() > 0)) {
+      return -EINVAL;
+    }
+
+    rc = rgwlib.get_fe()->execute_req(&req);
+    rc2 = req.get_ret();
+
+    return (((rc == 0) && (rc2 == 0)) ? 0 : -EIO);
+
+  } /* RGWLibFS::rmxattrs */
+
+  /* called with rgw_fh->mtx held */
   void RGWLibFS::update_fh(RGWFileHandle *rgw_fh)
   {
     int rc, rc2;
@@ -760,7 +1210,8 @@ namespace rgw {
       << " update old versioned fh : " << obj_name
       << dendl;
 
-    RGWSetAttrsRequest req(cct, get_user(), rgw_fh->bucket_name(), obj_name);
+    RGWSetAttrsRequest req(cct, rgwlib.get_store()->get_user(user.user_id),
+                          rgw_fh->bucket_name(), obj_name);
 
     rgw_fh->encode_attrs(ux_key, ux_attrs);
 
@@ -786,10 +1237,10 @@ namespace rgw {
     {
       RGWLibFS* fs;
     public:
-      ObjUnref(RGWLibFS* _fs) : fs(_fs) {}
+      explicit ObjUnref(RGWLibFS* _fs) : fs(_fs) {}
       void operator()(RGWFileHandle* fh) const {
        lsubdout(fs->get_context(), rgw, 5)
-         << __func__
+         << __PRETTY_FUNCTION__
          << fh->name
          << " before ObjUnref refs=" << fh->get_refcnt()
          << dendl;
@@ -804,6 +1255,15 @@ namespace rgw {
     rele();
   } /* RGWLibFS::close */
 
+  inline std::ostream& operator<<(std::ostream &os, fh_key const &fhk) {
+    os << "<fh_key: bucket=";
+    os << fhk.fh_hk.bucket;
+    os << "; object=";
+    os << fhk.fh_hk.object;
+    os << ">";
+    return os;
+  }
+
   inline std::ostream& operator<<(std::ostream &os, struct timespec const &ts) {
       os << "<timespec: tv_sec=";
       os << ts.tv_sec;
@@ -855,7 +1315,12 @@ namespace rgw {
        << dendl;
       {
        lock_guard guard(state.mtx); /* LOCKED */
-       /* just return if no events */
+       lsubdout(get_context(), rgw, 15)
+         << "GC: processing"
+         << " count=" << events.size()
+         << " events"
+         << dendl;
+        /* just return if no events */
        if (events.empty()) {
          return;
        }
@@ -969,29 +1434,39 @@ namespace rgw {
     }
   }
 
+  fh_key RGWFileHandle::make_fhk(const std::string& name)
+  {
+    std::string tenant = get_fs()->get_user()->user_id.to_str();
+    if (depth == 0) {
+      /* S3 bucket -- assert mount-at-bucket case reaches here */
+      return fh_key(name, name, tenant);
+    } else {
+      std::string key_name = make_key_name(name.c_str());
+      return fh_key(fhk.fh_hk.bucket, key_name.c_str(), tenant);
+    }
+  }
+
   void RGWFileHandle::encode_attrs(ceph::buffer::list& ux_key1,
                                   ceph::buffer::list& ux_attrs1)
   {
+    using ceph::encode;
     fh_key fhk(this->fh.fh_hk);
-    rgw::encode(fhk, ux_key1);
-    rgw::encode(*this, ux_attrs1);
+    encode(fhk, ux_key1);
+    encode(*this, ux_attrs1);
   } /* RGWFileHandle::encode_attrs */
 
   DecodeAttrsResult RGWFileHandle::decode_attrs(const ceph::buffer::list* ux_key1,
                                                 const ceph::buffer::list* ux_attrs1)
   {
+    using ceph::decode;
     DecodeAttrsResult dar { false, false };
     fh_key fhk;
-    auto bl_iter_key1  = const_cast<buffer::list*>(ux_key1)->begin();
-    rgw::decode(fhk, bl_iter_key1);
-    if (fhk.version >= 2) {
-      assert(this->fh.fh_hk == fhk.fh_hk);
-    } else {
-      get<0>(dar) = true;
-    }
+    auto bl_iter_key1 = ux_key1->cbegin();
+    decode(fhk, bl_iter_key1);
+    get<0>(dar) = true;
 
-    auto bl_iter_unix1 = const_cast<buffer::list*>(ux_attrs1)->begin();
-    rgw::decode(*this, bl_iter_unix1);
+    auto bl_iter_unix1 = ux_attrs1->cbegin();
+    decode(*this, bl_iter_unix1);
     if (this->state.version < 2) {
       get<1>(dar) = true;
     }
@@ -999,10 +1474,19 @@ namespace rgw {
     return dar;
   } /* RGWFileHandle::decode_attrs */
 
-  bool RGWFileHandle::reclaim() {
+  bool RGWFileHandle::reclaim(const cohort::lru::ObjectFactory* newobj_fac) {
     lsubdout(fs->get_context(), rgw, 17)
       << __func__ << " " << *this
       << dendl;
+    auto factory = dynamic_cast<const RGWFileHandle::Factory*>(newobj_fac);
+    if (factory == nullptr) {
+      return false;
+    }
+    /* make sure the reclaiming object is the same partiton with newobject factory,
+     * then we can recycle the object, and replace with newobject */
+    if (!fs->fh_cache.is_same_partition(factory->fhk.fh_hk.object, fh.fh_hk.object)) {
+      return false;
+    }
     /* in the non-delete case, handle may still be in handle table */
     if (fh_hook.is_linked()) {
       /* in this case, we are being called from a context which holds
@@ -1017,7 +1501,9 @@ namespace rgw {
     if (unlikely(! is_dir()))
       return false;
 
-    RGWRMdirCheck req(fs->get_context(), fs->get_user(), this);
+    RGWRMdirCheck req(fs->get_context(),
+                     rgwlib.get_store()->get_user(fs->get_user()->user_id),
+                     this);
     int rc = rgwlib.get_fe()->execute_req(&req);
     if (! rc) {
       return req.valid && req.has_children;
@@ -1049,6 +1535,11 @@ namespace rgw {
     struct timespec now;
     CephContext* cct = fs->get_context();
 
+    lsubdout(cct, rgw, 10)
+      << __func__ << " readdir called on "
+      << object_name()
+      << dendl;
+
     directory* d = get<directory>(&variant_type);
     if (d) {
       (void) clock_gettime(CLOCK_MONOTONIC_COARSE, &now); /* !LOCKED */
@@ -1057,15 +1548,18 @@ namespace rgw {
     }
 
     bool initial_off;
+    char* mk{nullptr};
+
     if (likely(!! get<const char*>(&offset))) {
-      initial_off = ! get<const char*>(offset);
+      mk = const_cast<char*>(get<const char*>(offset));
+      initial_off = !mk;
     } else {
       initial_off = (*get<uint64_t*>(offset) == 0);
     }
 
     if (is_root()) {
-      RGWListBucketsRequest req(cct, fs->get_user(), this, rcb, cb_arg,
-                               offset);
+      RGWListBucketsRequest req(cct, rgwlib.get_store()->get_user(fs->get_user()->user_id),
+                               this, rcb, cb_arg, offset);
       rc = rgwlib.get_fe()->execute_req(&req);
       if (! rc) {
        (void) clock_gettime(CLOCK_MONOTONIC_COARSE, &now); /* !LOCKED */
@@ -1075,12 +1569,10 @@ namespace rgw {
          set_nlink(2);
        inc_nlink(req.d_count);
        *eof = req.eof();
-       event ev(event::type::READDIR, get_key(), state.atime);
-       lock_guard sguard(fs->state.mtx);
-       fs->state.push_event(ev);
       }
     } else {
-      RGWReaddirRequest req(cct, fs->get_user(), this, rcb, cb_arg, offset);
+      RGWReaddirRequest req(cct, rgwlib.get_store()->get_user(fs->get_user()->user_id),
+                           this, rcb, cb_arg, offset);
       rc = rgwlib.get_fe()->execute_req(&req);
       if (! rc) {
        (void) clock_gettime(CLOCK_MONOTONIC_COARSE, &now); /* !LOCKED */
@@ -1090,12 +1582,13 @@ namespace rgw {
          set_nlink(2);
        inc_nlink(req.d_count);
        *eof = req.eof();
-       event ev(event::type::READDIR, get_key(), state.atime);
-       lock_guard sguard(fs->state.mtx);
-       fs->state.push_event(ev);
       }
     }
 
+    event ev(event::type::READDIR, get_key(), state.atime);
+    lock_guard sguard(fs->state.mtx);
+    fs->state.push_event(ev);
+
     lsubdout(fs->get_context(), rgw, 15)
       << __func__
       << " final link count=" << state.nlink
@@ -1139,6 +1632,7 @@ namespace rgw {
          << __func__
          << " " << object_name()
          << " non-0 initial write position " << off
+         << " (mounting with -o sync required)"
          << dendl;
        return -EIO;
       }
@@ -1146,8 +1640,9 @@ namespace rgw {
       /* start */
       std::string object_name = relative_object_name();
       f->write_req =
-       new RGWWriteRequest(fs->get_context(), fs->get_user(), this,
-                           bucket_name(), object_name);
+       new RGWWriteRequest(rgwlib.get_store(),
+                           rgwlib.get_store()->get_user(fs->get_user()->user_id),
+                           this, bucket_name(), object_name);
       rc = rgwlib.get_fe()->start_req(f->write_req);
       if (rc < 0) {
        lsubdout(fs->get_context(), rgw, 5)
@@ -1272,6 +1767,27 @@ namespace rgw {
     }
   }
 
+  void RGWFileHandle::advance_mtime(uint32_t flags) {
+    /* intended for use on directories, fast-forward mtime so as to
+     * ensure a new, higher value for the change attribute */
+    unique_lock uniq(mtx, std::defer_lock);
+    if (likely(! (flags & RGWFileHandle::FLAG_LOCKED))) {
+      uniq.lock();
+    }
+
+    /* advance mtime only if stored mtime is older than the
+     * configured namespace expiration */
+    auto now = real_clock::now();
+    auto cmptime = state.mtime;
+    cmptime.tv_sec +=
+      fs->get_context()->_conf->rgw_nfs_namespace_expire_secs;
+    if (cmptime < real_clock::to_timespec(now)) {
+      /* sets ctime as well as mtime, to avoid masking updates should
+       * ctime inexplicably hold a higher value */
+      set_times(now);
+    }
+  }
+
   void RGWFileHandle::invalidate() {
     RGWLibFS *fs = get_fs();
     if (fs->invalidate_cb) {
@@ -1280,29 +1796,32 @@ namespace rgw {
   }
 
   int RGWWriteRequest::exec_start() {
-    struct req_state* s = get_state();
+    struct req_state* state = get_state();
+
+    /* Object needs a bucket from this point */
+    state->object->set_bucket(state->bucket.get());
 
     auto compression_type =
-      get_store()->get_zone_params().get_compression_type(
-       s->bucket_info.placement_rule);
+      get_store()->svc()->zone->get_zone_params().get_compression_type(
+       state->bucket->get_placement_rule());
 
     /* not obviously supportable */
-    assert(! dlo_manifest);
-    assert(! slo_info);
+    ceph_assert(! dlo_manifest);
+    ceph_assert(! slo_info);
 
     perfcounter->inc(l_rgw_put);
     op_ret = -EINVAL;
 
-    if (s->object.empty()) {
-      ldout(s->cct, 0) << __func__ << " called on empty object" << dendl;
+    if (state->object->empty()) {
+      ldout(state->cct, 0) << __func__ << " called on empty object" << dendl;
       goto done;
     }
 
-    op_ret = get_params();
+    op_ret = get_params(null_yield);
     if (op_ret < 0)
       goto done;
 
-    op_ret = get_system_versioning_params(s, &olh_epoch, &version_id);
+    op_ret = get_system_versioning_params(state, &olh_epoch, &version_id);
     if (op_ret < 0) {
       goto done;
     }
@@ -1311,26 +1830,41 @@ namespace rgw {
     /* early quota check skipped--we don't have size yet */
     /* skipping user-supplied etag--we might have one in future, but
      * like data it and other attrs would arrive after open */
-    processor = select_processor(*static_cast<RGWObjectCtx *>(s->obj_ctx),
-                                &multipart);
-    op_ret = processor->prepare(get_store(), NULL);
+
+    aio.emplace(state->cct->_conf->rgw_put_obj_min_window_size);
+
+    if (state->bucket->versioning_enabled()) {
+      if (!version_id.empty()) {
+        state->object->set_instance(version_id);
+      } else {
+       state->object->gen_rand_obj_instance_name();
+        version_id = state->object->get_instance();
+      }
+    }
+    processor.emplace(&*aio, get_store(), state->bucket.get(),
+                      &state->dest_placement,
+                      state->bucket_owner.get_id(),
+                      *static_cast<RGWObjectCtx *>(state->obj_ctx),
+                      state->object->clone(), olh_epoch, state->req_id,
+                     this, state->yield);
+
+    op_ret = processor->prepare(state->yield);
     if (op_ret < 0) {
-      ldout(s->cct, 20) << "processor->prepare() returned ret=" << op_ret
+      ldout(state->cct, 20) << "processor->prepare() returned ret=" << op_ret
                        << dendl;
       goto done;
     }
-
-    filter = processor;
+    filter = &*processor;
     if (compression_type != "none") {
-      plugin = Compressor::create(s->cct, compression_type);
-    if (! plugin) {
-      ldout(s->cct, 1) << "Cannot load plugin for rgw_compression_type "
-                       << compression_type << dendl;
-    } else {
-      compressor.emplace(s->cct, plugin, filter);
-      filter = &*compressor;
+      plugin = Compressor::create(state->cct, compression_type);
+      if (! plugin) {
+        ldout(state->cct, 1) << "Cannot load plugin for rgw_compression_type "
+                         << compression_type << dendl;
+      } else {
+        compressor.emplace(state->cct, plugin, filter);
+        filter = &*compressor;
+      }
     }
-  }
 
   done:
     return op_ret;
@@ -1338,67 +1872,31 @@ namespace rgw {
 
   int RGWWriteRequest::exec_continue()
   {
-    struct req_state* s = get_state();
+    struct req_state* state = get_state();
     op_ret = 0;
 
     /* check guards (e.g., contig write) */
-    if (eio)
+    if (eio) {
+      ldout(state->cct, 5)
+        << " chunks arrived in wrong order"
+        << " (mounting with -o sync required)"
+        << dendl;
+      return -EIO;
+    }
+
+    op_ret = state->bucket->check_quota(user_quota, bucket_quota, real_ofs, null_yield, true);
+    /* max_size exceed */
+    if (op_ret < 0)
       return -EIO;
 
     size_t len = data.length();
     if (! len)
       return 0;
 
-    /* XXX we are currently synchronous--supplied data buffers cannot
-     * be used after the caller returns  */
-    bool need_to_wait = true;
-    bufferlist orig_data;
-
-    if (need_to_wait) {
-      orig_data = data;
-    }
-    hash.Update((const byte *)data.c_str(), data.length());
-    op_ret = put_data_and_throttle(filter, data, ofs, need_to_wait);
+    hash.Update((const unsigned char *)data.c_str(), data.length());
+    op_ret = filter->process(std::move(data), ofs);
     if (op_ret < 0) {
-      if (!need_to_wait || op_ret != -EEXIST) {
-       ldout(s->cct, 20) << "processor->thottle_data() returned ret="
-                         << op_ret << dendl;
-       goto done;
-      }
-
-      ldout(s->cct, 5) << "NOTICE: processor->throttle_data() returned -EEXIST, need to restart write" << dendl;
-
-      /* restore original data */
-      data.swap(orig_data);
-
-      /* restart processing with different oid suffix */
-      dispose_processor(processor);
-      processor = select_processor(*static_cast<RGWObjectCtx *>(s->obj_ctx),
-                                  &multipart);
-      filter = processor;
-
-      string oid_rand;
-      char buf[33];
-      gen_rand_alphanumeric(get_store()->ctx(), buf, sizeof(buf) - 1);
-      oid_rand.append(buf);
-
-      op_ret = processor->prepare(get_store(), &oid_rand);
-      if (op_ret < 0) {
-       ldout(s->cct, 0) << "ERROR: processor->prepare() returned "
-                        << op_ret << dendl;
-       goto done;
-      }
-
-      /* restore compression filter, if any */
-      if (compressor) {
-       compressor.emplace(s->cct, plugin, filter);
-       filter = &*compressor;
-      }
-
-      op_ret = put_data_and_throttle(filter, data, ofs, false);
-      if (op_ret < 0) {
-       goto done;
-      }
+      goto done;
     }
     bytes_written += len;
 
@@ -1412,24 +1910,24 @@ namespace rgw {
     map<string, string>::iterator iter;
     char calc_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
     unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE];
-    struct req_state* s = get_state();
+    struct req_state* state = get_state();
 
     size_t osize = rgw_fh->get_size();
     struct timespec octime = rgw_fh->get_ctime();
     struct timespec omtime = rgw_fh->get_mtime();
     real_time appx_t = real_clock::now();
 
-    s->obj_size = bytes_written;
-    perfcounter->inc(l_rgw_put_b, s->obj_size);
+    state->obj_size = bytes_written;
+    perfcounter->inc(l_rgw_put_b, state->obj_size);
 
-    op_ret = get_store()->check_quota(s->bucket_owner.get_id(), s->bucket,
-                                     user_quota, bucket_quota, s->obj_size);
+    // flush data in filters
+    op_ret = filter->process({}, state->obj_size);
     if (op_ret < 0) {
       goto done;
     }
 
-    op_ret = get_store()->check_bucket_shards(s->bucket_info, s->bucket,
-                                             bucket_quota);
+    op_ret = state->bucket->check_quota(user_quota, bucket_quota, state->obj_size, null_yield, true);
+    /* max_size exceed */
     if (op_ret < 0) {
       goto done;
     }
@@ -1440,11 +1938,11 @@ namespace rgw {
       bufferlist tmp;
       RGWCompressionInfo cs_info;
       cs_info.compression_type = plugin->get_type_name();
-      cs_info.orig_size = s->obj_size;
+      cs_info.orig_size = state->obj_size;
       cs_info.blocks = std::move(compressor->get_compression_blocks());
-      ::encode(cs_info, tmp);
+      encode(cs_info, tmp);
       attrs[RGW_ATTR_COMPRESSION] = tmp;
-      ldout(s->cct, 20) << "storing " << RGW_ATTR_COMPRESSION
+      ldpp_dout(this, 20) << "storing " << RGW_ATTR_COMPRESSION
                        << " with type=" << cs_info.compression_type
                        << ", orig_size=" << cs_info.orig_size
                        << ", blocks=" << cs_info.blocks.size() << dendl;
@@ -1468,14 +1966,14 @@ namespace rgw {
     emplace_attr(RGW_ATTR_UNIX_KEY1, std::move(ux_key));
     emplace_attr(RGW_ATTR_UNIX1, std::move(ux_attrs));
 
-    for (iter = s->generic_attrs.begin(); iter != s->generic_attrs.end();
+    for (iter = state->generic_attrs.begin(); iter != state->generic_attrs.end();
         ++iter) {
       buffer::list& attrbl = attrs[iter->first];
       const string& val = iter->second;
       attrbl.append(val.c_str(), val.size() + 1);
     }
 
-    op_ret = rgw_get_request_metadata(s->cct, s->info, attrs);
+    op_ret = rgw_get_request_metadata(this, state->cct, state->info, attrs);
     if (op_ret < 0) {
       goto done;
     }
@@ -1486,13 +1984,15 @@ namespace rgw {
      * processing any input from user in order to prohibit overwriting. */
     if (unlikely(!! slo_info)) {
       buffer::list slo_userindicator_bl;
-      ::encode("True", slo_userindicator_bl);
+      using ceph::encode;
+      encode("True", slo_userindicator_bl);
       emplace_attr(RGW_ATTR_SLO_UINDICATOR, std::move(slo_userindicator_bl));
     }
 
-    op_ret = processor->complete(s->obj_size, etag, &mtime, real_time(), attrs,
+    op_ret = processor->complete(state->obj_size, etag, &mtime, real_time(), attrs,
                                  (delete_at ? *delete_at : real_time()),
-                                if_match, if_nomatch);
+                                if_match, if_nomatch, nullptr, nullptr, nullptr,
+                                state->yield);
     if (op_ret != 0) {
       /* revert attr updates */
       rgw_fh->set_mtime(omtime);
@@ -1501,9 +2001,7 @@ namespace rgw {
     }
 
   done:
-    dispose_processor(processor);
-    perfcounter->tinc(l_rgw_put_lat,
-                     (ceph_clock_now() - s->time));
+    perfcounter->tinc(l_rgw_put_lat, state->time_elapsed());
     return op_ret;
   } /* exec_finish */
 
@@ -1534,9 +2032,10 @@ void rgwfile_version(int *major, int *minor, int *extra)
   /* stash access data for "mount" */
   RGWLibFS* new_fs = new RGWLibFS(static_cast<CephContext*>(rgw), uid, acc_key,
                                  sec_key, "/");
-  assert(new_fs);
+  ceph_assert(new_fs);
 
-  rc = new_fs->authorize(rgwlib.get_store());
+  const DoutPrefix dp(rgwlib.get_store()->ctx(), dout_subsys, "rgw mount: ");
+  rc = new_fs->authorize(&dp, rgwlib.get_store());
   if (rc != 0) {
     delete new_fs;
     return -EINVAL;
@@ -1565,9 +2064,10 @@ int rgw_mount2(librgw_t rgw, const char *uid, const char *acc_key,
   /* stash access data for "mount" */
   RGWLibFS* new_fs = new RGWLibFS(static_cast<CephContext*>(rgw), uid, acc_key,
                                  sec_key, root);
-  assert(new_fs);
+  ceph_assert(new_fs);
 
-  rc = new_fs->authorize(rgwlib.get_store());
+  const DoutPrefix dp(rgwlib.get_store()->ctx(), dout_subsys, "rgw mount2: ");
+  rc = new_fs->authorize(&dp, rgwlib.get_store());
   if (rc != 0) {
     delete new_fs;
     return -EINVAL;
@@ -1618,7 +2118,9 @@ int rgw_statfs(struct rgw_fs *rgw_fs,
   RGWLibFS *fs = static_cast<RGWLibFS*>(rgw_fs->fs_private);
   struct rados_cluster_stat_t stats;
 
-  RGWGetClusterStatReq req(fs->get_context(), fs->get_user(), stats);
+  RGWGetClusterStatReq req(fs->get_context(),
+                          rgwlib.get_store()->get_user(fs->get_user()->user_id),
+                          stats);
   int rc = rgwlib.get_fe()->execute_req(&req);
   if (rc < 0) {
     lderr(fs->get_context()) << "ERROR: getting total cluster usage"
@@ -1671,6 +2173,35 @@ int rgw_create(struct rgw_fs *rgw_fs, struct rgw_file_handle *parent_fh,
   return get<1>(fhr);
 } /* rgw_create */
 
+/*
+  create a symbolic link
+ */
+int rgw_symlink(struct rgw_fs *rgw_fs, struct rgw_file_handle *parent_fh,
+        const char *name, const char *link_path, struct stat *st, uint32_t mask,
+        struct rgw_file_handle **fh, uint32_t posix_flags,
+        uint32_t flags)
+{
+  using std::get;
+
+  RGWLibFS *fs = static_cast<RGWLibFS*>(rgw_fs->fs_private);
+  RGWFileHandle* parent = get_rgwfh(parent_fh);
+
+  if ((! parent) ||
+      (parent->is_root()) ||
+      (parent->is_file())) {
+    /* bad parent */
+    return -EINVAL;
+  }
+
+  MkObjResult fhr = fs->symlink(parent, name, link_path, st, mask, flags);
+  RGWFileHandle *nfh = get<0>(fhr); // nullptr if !success
+
+  if (nfh)
+    *fh = nfh->get_fh();
+
+  return get<1>(fhr);
+} /* rgw_symlink */
+
 /*
   create a new directory
 */
@@ -1731,7 +2262,8 @@ int rgw_unlink(struct rgw_fs *rgw_fs, struct rgw_file_handle *parent_fh,
 */
 int rgw_lookup(struct rgw_fs *rgw_fs,
              struct rgw_file_handle *parent_fh, const char* path,
-             struct rgw_file_handle **fh, uint32_t flags)
+             struct rgw_file_handle **fh,
+             struct stat *st, uint32_t mask, uint32_t flags)
 {
   //CephContext* cct = static_cast<CephContext*>(rgw_fs->rgw);
   RGWLibFS *fs = static_cast<RGWLibFS*>(rgw_fs->fs_private);
@@ -1763,18 +2295,33 @@ int rgw_lookup(struct rgw_fs *rgw_fs,
     if (unlikely((strcmp(path, "..") == 0))) {
       rgw_fh = parent;
       lsubdout(fs->get_context(), rgw, 17)
-       << __func__ << "BANG"<< *rgw_fh
+       << __func__ << " BANG"<< *rgw_fh
        << dendl;
       fs->ref(rgw_fh);
     } else {
-      /* lookup in a readdir callback */
       enum rgw_fh_type fh_type = fh_type_of(flags);
 
       uint32_t sl_flags = (flags & RGW_LOOKUP_FLAG_RCB)
-       ? RGWFileHandle::FLAG_NONE
+       ? RGWFileHandle::FLAG_IN_CB
        : RGWFileHandle::FLAG_EXACT_MATCH;
 
-      fhr = fs->stat_leaf(parent, path, fh_type, sl_flags);
+      bool fast_attrs= fs->get_context()->_conf->rgw_nfs_s3_fast_attrs;
+
+      if ((flags & RGW_LOOKUP_FLAG_RCB) && fast_attrs) {
+       /* FAKE STAT--this should mean, interpolate special
+        * owner, group, and perms masks */
+       fhr = fs->fake_leaf(parent, path, fh_type, st, mask, sl_flags);
+      } else {
+       if ((fh_type == RGW_FS_TYPE_DIRECTORY) && fast_attrs) {
+         /* trust cached dir, if present */
+         fhr = fs->lookup_fh(parent, path, RGWFileHandle::FLAG_DIRECTORY);
+         if (get<0>(fhr)) {
+           rgw_fh = get<0>(fhr);
+           goto done;
+         }
+       }
+       fhr = fs->stat_leaf(parent, path, fh_type, sl_flags);
+      }
       if (! get<0>(fhr)) {
        if (! (flags & RGW_LOOKUP_FLAG_CREATE))
          return -ENOENT;
@@ -1785,6 +2332,7 @@ int rgw_lookup(struct rgw_fs *rgw_fs,
     }
   } /* !root */
 
+done:
   struct rgw_file_handle *rfh = rgw_fh->get_fh();
   *fh = rfh;
 
@@ -1919,8 +2467,8 @@ int rgw_readdir(struct rgw_fs *rgw_fs,
   if ((*offset == 0) &&
       (flags & RGW_READDIR_FLAG_DOTDOT)) {
     /* send '.' and '..' with their NFS-defined offsets */
-    rcb(".", cb_arg, 1, RGW_LOOKUP_FLAG_DIR);
-    rcb("..", cb_arg, 2, RGW_LOOKUP_FLAG_DIR);
+    rcb(".", cb_arg, 1, nullptr, 0, RGW_LOOKUP_FLAG_DIR);
+    rcb("..", cb_arg, 2, nullptr, 0, RGW_LOOKUP_FLAG_DIR);
   }
 
   int rc = parent->readdir(rcb, cb_arg, offset, eof, flags);
@@ -1947,8 +2495,8 @@ int rgw_readdir2(struct rgw_fs *rgw_fs,
   if ((! name) &&
       (flags & RGW_READDIR_FLAG_DOTDOT)) {
     /* send '.' and '..' with their NFS-defined offsets */
-    rcb(".", cb_arg, 1, RGW_LOOKUP_FLAG_DIR);
-    rcb("..", cb_arg, 2, RGW_LOOKUP_FLAG_DIR);
+    rcb(".", cb_arg, 1, nullptr, 0, RGW_LOOKUP_FLAG_DIR);
+    rcb("..", cb_arg, 2, nullptr, 0, RGW_LOOKUP_FLAG_DIR);
   }
 
   int rc = parent->readdir(rcb, cb_arg, name, eof, flags);
@@ -1985,6 +2533,20 @@ int rgw_read(struct rgw_fs *rgw_fs,
   return fs->read(rgw_fh, offset, length, bytes_read, buffer, flags);
 }
 
+/*
+   read symbolic link
+*/
+int rgw_readlink(struct rgw_fs *rgw_fs,
+            struct rgw_file_handle *fh, uint64_t offset,
+            size_t length, size_t *bytes_read, void *buffer,
+            uint32_t flags)
+{
+  RGWLibFS *fs = static_cast<RGWLibFS*>(rgw_fs->fs_private);
+  RGWFileHandle* rgw_fh = get_rgwfh(fh);
+
+  return fs->readlink(rgw_fh, offset, length, bytes_read, buffer, flags);
+}
+
 /*
    write data to file
 */
@@ -2025,12 +2587,12 @@ class RGWReadV
 
 public:
   RGWReadV(buffer::list& _bl, rgw_vio* _vio) : vio(_vio) {
-    bl.claim(_bl);
+    bl = std::move(_bl);
   }
 
   struct rgw_vio* get_vio() { return vio; }
 
-  const std::list<buffer::ptr>& buffers() { return bl.buffers(); }
+  const auto& buffers() { return bl.buffers(); }
 
   unsigned /* XXX */ length() { return bl.length(); }
 
@@ -2103,6 +2665,7 @@ int rgw_writev(struct rgw_fs *rgw_fs, struct rgw_file_handle *fh,
              rgw_uio *uio, uint32_t flags)
 {
 
+  // not supported - rest of function is ignored
   return -ENOTSUP;
 
   CephContext* cct = static_cast<CephContext*>(rgw_fs->rgw);
@@ -2121,8 +2684,8 @@ int rgw_writev(struct rgw_fs *rgw_fs, struct rgw_file_handle *fh,
   }
 
   std::string oname = rgw_fh->relative_object_name();
-  RGWPutObjRequest req(cct, fs->get_user(), rgw_fh->bucket_name(),
-                      oname, bl);
+  RGWPutObjRequest req(cct, rgwlib.get_store()->get_user(fs->get_user()->user_id),
+                      rgw_fh->bucket_name(), oname, bl);
 
   int rc = rgwlib.get_fe()->execute_req(&req);
 
@@ -2148,4 +2711,46 @@ int rgw_commit(struct rgw_fs *rgw_fs, struct rgw_file_handle *fh,
   return rgw_fh->commit(offset, length, RGWFileHandle::FLAG_NONE);
 }
 
+/*
+  extended attributes
+ */
+
+int rgw_getxattrs(struct rgw_fs *rgw_fs, struct rgw_file_handle *fh,
+                 rgw_xattrlist *attrs, rgw_getxattr_cb cb, void *cb_arg,
+                 uint32_t flags)
+{
+  RGWLibFS *fs = static_cast<RGWLibFS*>(rgw_fs->fs_private);
+  RGWFileHandle* rgw_fh = get_rgwfh(fh);
+
+  return fs->getxattrs(rgw_fh, attrs, cb, cb_arg, flags);
+}
+
+int rgw_lsxattrs(struct rgw_fs *rgw_fs, struct rgw_file_handle *fh,
+                rgw_xattrstr *filter_prefix /* ignored */,
+                rgw_getxattr_cb cb, void *cb_arg, uint32_t flags)
+{
+  RGWLibFS *fs = static_cast<RGWLibFS*>(rgw_fs->fs_private);
+  RGWFileHandle* rgw_fh = get_rgwfh(fh);
+
+  return fs->lsxattrs(rgw_fh, filter_prefix, cb, cb_arg, flags);
+}
+
+int rgw_setxattrs(struct rgw_fs *rgw_fs, struct rgw_file_handle *fh,
+                 rgw_xattrlist *attrs, uint32_t flags)
+{
+  RGWLibFS *fs = static_cast<RGWLibFS*>(rgw_fs->fs_private);
+  RGWFileHandle* rgw_fh = get_rgwfh(fh);
+
+  return fs->setxattrs(rgw_fh, attrs, flags);
+}
+
+int rgw_rmxattrs(struct rgw_fs *rgw_fs, struct rgw_file_handle *fh,
+                rgw_xattrlist *attrs, uint32_t flags)
+{
+  RGWLibFS *fs = static_cast<RGWLibFS*>(rgw_fs->fs_private);
+  RGWFileHandle* rgw_fh = get_rgwfh(fh);
+
+  return fs->rmxattrs(rgw_fh, attrs, flags);
+}
+
 } /* extern "C" */