]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_file.h
update sources to 12.2.7
[ceph.git] / ceph / src / rgw / rgw_file.h
index 433356344e271dcaf842e7f3460219c52c7ba912..bfb95be737abe29216d666778c805f9fce0765ee 100644 (file)
@@ -34,6 +34,7 @@
 #include "rgw_lib.h"
 #include "rgw_ldap.h"
 #include "rgw_token.h"
+#include "rgw_compression.h"
 
 
 /* XXX
@@ -91,42 +92,50 @@ namespace rgw {
   struct fh_key
   {
     rgw_fh_hk fh_hk;
+    uint32_t version;
 
     static constexpr uint64_t seed = 8675309;
 
-    fh_key() {}
+    fh_key() : version(0) {}
 
     fh_key(const rgw_fh_hk& _hk)
-      : fh_hk(_hk) {
+      : fh_hk(_hk), version(0) {
       // nothing
     }
 
-    fh_key(const uint64_t bk, const uint64_t ok) {
+    fh_key(const uint64_t bk, const uint64_t ok)
+      : version(0) {
       fh_hk.bucket = bk;
       fh_hk.object = ok;
     }
 
-    fh_key(const uint64_t bk, const char *_o) {
+    fh_key(const uint64_t bk, const char *_o)
+      : version(0) {
       fh_hk.bucket = bk;
       fh_hk.object = XXH64(_o, ::strlen(_o), seed);
     }
     
-    fh_key(const std::string& _b, const std::string& _o) {
+    fh_key(const std::string& _b, const std::string& _o)
+      : version(0) {
       fh_hk.bucket = XXH64(_b.c_str(), _o.length(), seed);
       fh_hk.object = XXH64(_o.c_str(), _o.length(), seed);
     }
 
     void encode(buffer::list& bl) const {
-      ENCODE_START(1, 1, bl);
+      ENCODE_START(2, 1, bl);
       ::encode(fh_hk.bucket, bl);
       ::encode(fh_hk.object, bl);
+      ::encode((uint32_t)2, bl);
       ENCODE_FINISH(bl);
     }
 
     void decode(bufferlist::iterator& bl) {
-      DECODE_START(1, bl);
+      DECODE_START(2, bl);
       ::decode(fh_hk.bucket, bl);
       ::decode(fh_hk.object, bl);
+      if (struct_v >= 2) {
+       ::decode(version, bl);
+      }
       DECODE_FINISH(bl);
     }
   }; /* fh_key */
@@ -164,6 +173,8 @@ namespace rgw {
   using boost::variant;
   using boost::container::flat_map;
 
+  typedef std::tuple<bool, bool> DecodeAttrsResult;
+
   class RGWFileHandle : public cohort::lru::Object
   {
     struct rgw_file_handle fh;
@@ -195,8 +206,9 @@ namespace rgw {
       struct timespec ctime;
       struct timespec mtime;
       struct timespec atime;
+      uint32_t version;
       State() : dev(0), size(0), nlink(1), owner_uid(0), owner_gid(0),
-               ctime{0,0}, mtime{0,0}, atime{0,0} {}
+               ctime{0,0}, mtime{0,0}, atime{0,0}, version(0) {}
     } state;
 
     struct file {
@@ -241,6 +253,7 @@ namespace rgw {
     static constexpr uint32_t FLAG_LOCKED = 0x0200;
     static constexpr uint32_t FLAG_STATELESS_OPEN = 0x0400;
     static constexpr uint32_t FLAG_EXACT_MATCH = 0x0800;
+    static constexpr uint32_t FLAG_MOUNT = 0x1000;
 
 #define CREATE_FLAGS(x) \
     ((x) & ~(RGWFileHandle::FLAG_CREATE|RGWFileHandle::FLAG_LOCK))
@@ -248,33 +261,47 @@ namespace rgw {
     friend class RGWLibFS;
 
   private:
-    RGWFileHandle(RGWLibFS* _fs, uint32_t fs_inst)
+    RGWFileHandle(RGWLibFS* _fs)
       : fs(_fs), bucket(nullptr), parent(nullptr), variant_type{directory()},
-       depth(0), flags(FLAG_ROOT)
+       depth(0), flags(FLAG_NONE)
       {
        /* root */
        fh.fh_type = RGW_FS_TYPE_DIRECTORY;
        variant_type = directory();
        /* stat */
-       state.dev = fs_inst;
        state.unix_mode = RGW_RWXMODE|S_IFDIR;
        /* pointer to self */
        fh.fh_private = this;
       }
 
-    void init_rootfs(std::string& fsid, const std::string& object_name) {
+    uint64_t init_fsid(std::string& uid) {
+      return XXH64(uid.c_str(), uid.length(), fh_key::seed);
+    }
+
+    void init_rootfs(std::string& fsid, const std::string& object_name,
+                     bool is_bucket) {
       /* fh_key */
       fh.fh_hk.bucket = XXH64(fsid.c_str(), fsid.length(), fh_key::seed);
       fh.fh_hk.object = XXH64(object_name.c_str(), object_name.length(),
                              fh_key::seed);
       fhk = fh.fh_hk;
       name = object_name;
+
+      state.dev = init_fsid(fsid);
+
+      if (is_bucket) {
+        flags |= RGWFileHandle::FLAG_BUCKET | RGWFileHandle::FLAG_MOUNT;
+        bucket = this;
+        depth = 1;
+      } else {
+        flags |= RGWFileHandle::FLAG_ROOT | RGWFileHandle::FLAG_MOUNT;
+      }
     }
 
   public:
-    RGWFileHandle(RGWLibFS* fs, uint32_t fs_inst, RGWFileHandle* _parent,
+    RGWFileHandle(RGWLibFS* _fs, RGWFileHandle* _parent,
                  const fh_key& _fhk, std::string& _name, uint32_t _flags)
-      : fs(fs), bucket(nullptr), parent(_parent), name(std::move(_name)),
+      : fs(_fs), bucket(nullptr), parent(_parent), name(std::move(_name)),
        fhk(_fhk), flags(_flags) {
 
       if (parent->is_root()) {
@@ -282,7 +309,7 @@ namespace rgw {
        variant_type = directory();
        flags |= FLAG_BUCKET;
       } else {
-       bucket = (parent->flags & FLAG_BUCKET) ? parent
+       bucket = parent->is_bucket() ? parent
          : parent->bucket;
        if (flags & FLAG_DIRECTORY) {
          fh.fh_type = RGW_FS_TYPE_DIRECTORY;
@@ -298,8 +325,8 @@ namespace rgw {
       /* save constant fhk */
       fh.fh_hk = fhk.fh_hk; /* XXX redundant in fh_hk */
 
-      /* stat */
-      state.dev = fs_inst;
+      /* inherits parent's fsid */
+      state.dev = parent->state.dev;
 
       switch (fh.fh_type) {
       case RGW_FS_TYPE_DIRECTORY:
@@ -410,7 +437,7 @@ namespace rgw {
     const std::string& bucket_name() const {
       if (is_root())
        return root_name;
-      if (flags & FLAG_BUCKET)
+      if (is_bucket())
        return name;
       return bucket->object_name();
     }
@@ -446,12 +473,15 @@ namespace rgw {
       return full_object_name(true /* omit_bucket */);
     }
 
-    inline std::string format_child_name(const std::string& cbasename) const {
+    inline std::string format_child_name(const std::string& cbasename,
+                                         bool is_dir) const {
       std::string child_name{relative_object_name()};
       if ((child_name.size() > 0) &&
          (child_name.back() != '/'))
        child_name += "/";
       child_name += cbasename;
+      if (is_dir)
+       child_name += "/";
       return child_name;
     }
 
@@ -493,8 +523,17 @@ namespace rgw {
       return nullptr;
     }
 
+    int offset_of(const std::string& name, int64_t *offset, uint32_t flags) {
+      if (unlikely(! is_dir())) {
+       return -EINVAL;
+      }
+      *offset = XXH64(name.c_str(), name.length(), fh_key::seed);
+      return 0;
+    }
+
     bool is_open() const { return flags & FLAG_OPEN; }
     bool is_root() const { return flags & FLAG_ROOT; }
+    bool is_mount() const { return flags & FLAG_MOUNT; }
     bool is_bucket() const { return flags & FLAG_BUCKET; }
     bool is_object() const { return !is_bucket(); }
     bool is_file() const { return (fh.fh_type == RGW_FS_TYPE_FILE); }
@@ -506,7 +545,7 @@ namespace rgw {
 
     int open(uint32_t gsh_flags) {
       lock_guard guard(mtx);
-      if (! (flags & FLAG_OPEN)) {
+      if (! is_open()) {
        if (gsh_flags & RGW_OPEN_FLAG_V3) {
          flags |= FLAG_STATELESS_OPEN;
        }
@@ -516,8 +555,11 @@ namespace rgw {
       return -EPERM;
     }
 
-    int readdir(rgw_readdir_cb rcb, void *cb_arg, uint64_t *offset, bool *eof,
-               uint32_t flags);
+    typedef boost::variant<uint64_t*, const char*> readdir_offset;
+
+    int readdir(rgw_readdir_cb rcb, void *cb_arg, readdir_offset offset,
+               bool *eof, uint32_t flags);
+
     int write(uint64_t off, size_t len, size_t *nbytes, void *buffer);
 
     int commit(uint64_t offset, uint64_t length, uint32_t flags) {
@@ -574,7 +616,7 @@ namespace rgw {
     }
 
     void encode(buffer::list& bl) const {
-      ENCODE_START(1, 1, bl);
+      ENCODE_START(2, 1, bl);
       ::encode(uint32_t(fh.fh_type), bl);
       ::encode(state.dev, bl);
       ::encode(state.size, bl);
@@ -585,11 +627,12 @@ namespace rgw {
       for (const auto& t : { state.ctime, state.mtime, state.atime }) {
        ::encode(real_clock::from_timespec(t), bl);
       }
+      ::encode((uint32_t)2, bl);
       ENCODE_FINISH(bl);
     }
 
     void decode(bufferlist::iterator& bl) {
-      DECODE_START(1, bl);
+      DECODE_START(2, bl);
       uint32_t fh_type;
       ::decode(fh_type, bl);
       assert(fh.fh_type == fh_type);
@@ -604,14 +647,17 @@ namespace rgw {
        ::decode(enc_time, bl);
        *t = real_clock::to_timespec(enc_time);
       }
+      if (struct_v >= 2) {
+        ::decode(state.version, bl);
+      }
       DECODE_FINISH(bl);
     }
 
     void encode_attrs(ceph::buffer::list& ux_key1,
                      ceph::buffer::list& ux_attrs1);
 
-    void decode_attrs(const ceph::buffer::list* ux_key1,
-                     const ceph::buffer::list* ux_attrs1);
+    DecodeAttrsResult decode_attrs(const ceph::buffer::list* ux_key1,
+                                   const ceph::buffer::list* ux_attrs1);
 
     void invalidate();
 
@@ -674,7 +720,6 @@ namespace rgw {
     {
     public:
       RGWLibFS* fs;
-      uint32_t fs_inst;
       RGWFileHandle* parent;
       const fh_key& fhk;
       std::string& name;
@@ -682,20 +727,20 @@ namespace rgw {
 
       Factory() = delete;
 
-      Factory(RGWLibFS* fs, uint32_t fs_inst, RGWFileHandle* parent,
-             const fh_key& fhk, std::string& name, uint32_t flags)
-       : fs(fs), fs_inst(fs_inst), parent(parent), fhk(fhk), name(name),
-         flags(flags) {}
+      Factory(RGWLibFS* _fs, RGWFileHandle* _parent,
+             const fh_key& _fhk, std::string& _name, uint32_t _flags)
+       : fs(_fs), parent(_parent), fhk(_fhk), name(_name),
+         flags(_flags) {}
 
       void recycle (cohort::lru::Object* o) override {
        /* re-use an existing object */
        o->~Object(); // call lru::Object virtual dtor
        // placement new!
-       new (o) RGWFileHandle(fs, fs_inst, parent, fhk, name, flags);
+       new (o) RGWFileHandle(fs, parent, fhk, name, flags);
       }
 
       cohort::lru::Object* alloc() override {
-       return new RGWFileHandle(fs, fs_inst, parent, fhk, name, flags);
+       return new RGWFileHandle(fs, parent, fhk, name, flags);
       }
     }; /* Factory */
 
@@ -729,7 +774,7 @@ namespace rgw {
   class RGWLibFS
   {
     CephContext* cct;
-    struct rgw_fs fs;
+    struct rgw_fs fs{};
     RGWFileHandle root_fh;
     rgw_fh_callback_t invalidate_cb;
     void *invalidate_arg;
@@ -748,7 +793,6 @@ namespace rgw {
     static std::atomic<uint32_t> fs_inst_counter;
 
     static uint32_t write_completion_interval_s;
-    std::string fsid;
 
     using lock_guard = std::lock_guard<std::mutex>;
     using unique_lock = std::unique_lock<std::mutex>;
@@ -778,7 +822,7 @@ namespace rgw {
       }
 
       void operator()() {
-       rgw_fh.write_finish();
+       rgw_fh.close(); /* will finish in-progress write */
        rgw_fh.get_fs()->unref(&rgw_fh);
       }
     };
@@ -809,9 +853,16 @@ namespace rgw {
     static constexpr uint32_t FLAG_NONE =      0x0000;
     static constexpr uint32_t FLAG_CLOSED =    0x0001;
 
+    struct BucketStats {
+      size_t size;
+      size_t size_rounded;
+      real_time creation_time;
+      uint64_t num_entries;
+    };
+
     RGWLibFS(CephContext* _cct, const char *_uid, const char *_user_id,
-           const char* _key)
-      : cct(_cct), root_fh(this, new_inst()), invalidate_cb(nullptr),
+           const char* _key, const char *root)
+      : cct(_cct), root_fh(this), invalidate_cb(nullptr),
        invalidate_arg(nullptr), shutdown(false), refcnt(1),
        fh_cache(cct->_conf->rgw_nfs_fhcache_partitions,
                 cct->_conf->rgw_nfs_fhcache_size),
@@ -819,17 +870,19 @@ namespace rgw {
               cct->_conf->rgw_nfs_lru_lane_hiwat),
        uid(_uid), key(_user_id, _key) {
 
-      /* no bucket may be named rgw_fs_inst-(.*) */
-      fsid = RGWFileHandle::root_name + "rgw_fs_inst-" +
-       std::to_string(get_inst());
-
-      root_fh.init_rootfs(fsid /* bucket */, RGWFileHandle::root_name);
+      if (!root || !strcmp(root, "/")) {
+        root_fh.init_rootfs(uid, RGWFileHandle::root_name, false);
+      } else {
+        root_fh.init_rootfs(uid, root, true);
+      }
 
       /* pointer to self */
       fs.fs_private = this;
 
       /* expose public root fh */
       fs.root_fh = root_fh.get_fh();
+
+      new_inst();
     }
 
     friend void intrusive_ptr_add_ref(const RGWLibFS* fs) {
@@ -914,6 +967,7 @@ namespace rgw {
       LookupFHResult fhr { nullptr, uint32_t(RGWFileHandle::FLAG_NONE) };
 
       RGWFileHandle::FHCache::Latch lat;
+      bool fh_locked = flags & RGWFileHandle::FLAG_LOCKED;
 
     retry:
       RGWFileHandle* fh =
@@ -922,11 +976,13 @@ namespace rgw {
                            RGWFileHandle::FHCache::FLAG_LOCK);
       /* LATCHED */
       if (fh) {
-       fh->mtx.lock(); // XXX !RAII because may-return-LOCKED
+       if (likely(! fh_locked))
+           fh->mtx.lock(); // XXX !RAII because may-return-LOCKED
        /* need initial ref from LRU (fast path) */
        if (! fh_lru.ref(fh, cohort::lru::FLAG_INITIAL)) {
          lat.lock->unlock();
-         fh->mtx.unlock();
+         if (likely(! fh_locked))
+           fh->mtx.unlock();
          goto retry; /* !LATCHED */
        }
        /* LATCHED, LOCKED */
@@ -958,6 +1014,7 @@ namespace rgw {
        return fhr;
 
       RGWFileHandle::FHCache::Latch lat;
+      bool fh_locked = flags & RGWFileHandle::FLAG_LOCKED;
 
       std::string obj_name{name};
       std::string key_name{parent->make_key_name(name)};
@@ -968,7 +1025,7 @@ namespace rgw {
        << " (" << obj_name << ")"
        << dendl;
 
-      fh_key fhk = parent->make_fhk(key_name);
+      fh_key fhk = parent->make_fhk(obj_name);
 
     retry:
       RGWFileHandle* fh =
@@ -977,40 +1034,53 @@ namespace rgw {
                            RGWFileHandle::FHCache::FLAG_LOCK);
       /* LATCHED */
       if (fh) {
-       fh->mtx.lock(); // XXX !RAII because may-return-LOCKED
+       if (likely(! fh_locked))
+         fh->mtx.lock(); // XXX !RAII because may-return-LOCKED
        if (fh->flags & RGWFileHandle::FLAG_DELETED) {
          /* for now, delay briefly and retry */
          lat.lock->unlock();
-         fh->mtx.unlock();
+         if (likely(! fh_locked))
+           fh->mtx.unlock();
          std::this_thread::sleep_for(std::chrono::milliseconds(20));
          goto retry; /* !LATCHED */
        }
        /* need initial ref from LRU (fast path) */
        if (! fh_lru.ref(fh, cohort::lru::FLAG_INITIAL)) {
          lat.lock->unlock();
-         fh->mtx.unlock();
+         if (likely(! fh_locked))
+           fh->mtx.unlock();
          goto retry; /* !LATCHED */
        }
        /* LATCHED, LOCKED */
        if (! (flags & RGWFileHandle::FLAG_LOCK))
-         fh->mtx.unlock(); /* ! LOCKED */
+         if (likely(! fh_locked))
+           fh->mtx.unlock(); /* ! LOCKED */
       } else {
        /* make or re-use handle */
-       RGWFileHandle::Factory prototype(this, get_inst(), parent, fhk,
+       RGWFileHandle::Factory prototype(this, parent, fhk,
                                         obj_name, CREATE_FLAGS(flags));
+       uint32_t iflags{cohort::lru::FLAG_INITIAL};
        fh = static_cast<RGWFileHandle*>(
          fh_lru.insert(&prototype,
                        cohort::lru::Edge::MRU,
-                       cohort::lru::FLAG_INITIAL));
+                       iflags));
        if (fh) {
          /* lock fh (LATCHED) */
          if (flags & RGWFileHandle::FLAG_LOCK)
            fh->mtx.lock();
-         /* inserts, releasing latch */
-         fh_cache.insert_latched(fh, lat, RGWFileHandle::FHCache::FLAG_UNLOCK);
+         if (likely(! (iflags & cohort::lru::FLAG_RECYCLE))) {
+           /* inserts at cached insert iterator, releasing latch */
+           fh_cache.insert_latched(
+             fh, lat, RGWFileHandle::FHCache::FLAG_UNLOCK);
+         } else {
+           /* recycle step invalidates Latch */
+           fh_cache.insert(
+             fhk.fh_hk.object, fh, RGWFileHandle::FHCache::FLAG_NONE);
+           lat.lock->unlock(); /* !LATCHED */
+         }
          get<1>(fhr) |= RGWFileHandle::FLAG_CREATE;
          /* ref parent (non-initial ref cannot fail on valid object) */
-         if (! parent->is_root()) {
+         if (! parent->is_mount()) {
            (void) fh_lru.ref(parent, cohort::lru::FLAG_NONE);
          }
          goto out; /* !LATCHED */
@@ -1031,13 +1101,13 @@ namespace rgw {
     } /*  lookup_fh(RGWFileHandle*, const char *, const uint32_t) */
 
     inline void unref(RGWFileHandle* fh) {
-      if (likely(! fh->is_root())) {
+      if (likely(! fh->is_mount())) {
        (void) fh_lru.unref(fh, cohort::lru::FLAG_NONE);
       }
     }
 
     inline RGWFileHandle* ref(RGWFileHandle* fh) {
-      if (likely(! fh->is_root())) {
+      if (likely(! fh->is_mount())) {
        fh_lru.ref(fh, cohort::lru::FLAG_NONE);
       }
       return fh;
@@ -1048,8 +1118,11 @@ namespace rgw {
     int setattr(RGWFileHandle* rgw_fh, struct stat* st, uint32_t mask,
                uint32_t flags);
 
-    LookupFHResult stat_bucket(RGWFileHandle* parent,
-                              const char *path, uint32_t flags);
+    void update_fh(RGWFileHandle *rgw_fh);
+
+    LookupFHResult stat_bucket(RGWFileHandle* parent, const char *path,
+                              RGWLibFS::BucketStats& bs,
+                              uint32_t flags);
 
     LookupFHResult stat_leaf(RGWFileHandle* parent, const char *path,
                             enum rgw_fh_type type = RGW_FS_TYPE_NIL,
@@ -1066,8 +1139,6 @@ namespace rgw {
 
     MkObjResult mkdir(RGWFileHandle* parent, const char *name, struct stat *st,
                      uint32_t mask, uint32_t flags);
-    MkObjResult mkdir2(RGWFileHandle* parent, const char *name, struct stat *st,
-                     uint32_t mask, uint32_t flags);
 
     int unlink(RGWFileHandle* rgw_fh, const char *name,
               uint32_t flags = FLAG_NONE);
@@ -1117,7 +1188,6 @@ namespace rgw {
       if (! fh) {
        if (unlikely(fh_hk == root_fh.fh.fh_hk)) {
          fh = &root_fh;
-         ref(fh);
        }
       }
 
@@ -1130,7 +1200,7 @@ namespace rgw {
 
     struct rgw_fs* get_fs() { return &fs; }
 
-    uint32_t get_inst() { return root_fh.state.dev; }
+    uint64_t get_fsid() { return root_fh.state.dev; }
 
     RGWUserInfo* get_user() { return &user; }
 
@@ -1157,20 +1227,32 @@ class RGWListBucketsRequest : public RGWLibRequest,
 {
 public:
   RGWFileHandle* rgw_fh;
-  uint64_t* offset;
+  RGWFileHandle::readdir_offset offset;
   void* cb_arg;
   rgw_readdir_cb rcb;
+  uint64_t* ioff;
   size_t ix;
   uint32_t d_count;
 
   RGWListBucketsRequest(CephContext* _cct, RGWUserInfo *_user,
                        RGWFileHandle* _rgw_fh, rgw_readdir_cb _rcb,
-                       void* _cb_arg, uint64_t* _offset)
+                       void* _cb_arg, RGWFileHandle::readdir_offset& _offset)
     : RGWLibRequest(_cct, _user), rgw_fh(_rgw_fh), offset(_offset),
-      cb_arg(_cb_arg), rcb(_rcb), ix(0), d_count(0) {
-    const auto& mk = rgw_fh->find_marker(*offset);
-    if (mk) {
-      marker = mk->name;
+      cb_arg(_cb_arg), rcb(_rcb), ioff(nullptr), ix(0), d_count(0) {
+
+    using boost::get;
+
+    if (unlikely(!! get<uint64_t*>(&offset))) {
+      ioff = get<uint64_t*>(offset);
+      const auto& mk = rgw_fh->find_marker(*ioff);
+      if (mk) {
+       marker = mk->name;
+      }
+    } else {
+      const char* mk = get<const char*>(offset);
+      if (mk) {
+       marker = mk;
+      }
     }
     op = this;
   }
@@ -1241,7 +1323,9 @@ public:
   int operator()(const boost::string_ref& name,
                 const boost::string_ref& marker) {
     uint64_t off = XXH64(name.data(), name.length(), fh_key::seed);
-    *offset = off;
+    if (!! ioff) {
+      *ioff = off;
+    }
     /* update traversal cache */
     rgw_fh->add_marker(off, rgw_obj_key{marker.data(), ""},
                       RGW_FS_TYPE_DIRECTORY);
@@ -1250,9 +1334,15 @@ public:
   }
 
   bool eof() {
-    lsubdout(cct, rgw, 15) << "READDIR offset: " << *offset
-                          << " is_truncated: " << is_truncated
-                          << dendl;
+    if (unlikely(cct->_conf->subsys.should_gather(ceph_subsys_rgw, 15))) {
+      bool is_offset =
+       unlikely(! get<const char*>(&offset)) ||
+       !! get<const char*>(offset);
+      lsubdout(cct, rgw, 15) << "READDIR offset: " <<
+       ((is_offset) ? offset : "(nil)")
+                            << " is_truncated: " << is_truncated
+                            << dendl;
+    }
     return !is_truncated;
   }
 
@@ -1267,26 +1357,42 @@ class RGWReaddirRequest : public RGWLibRequest,
 {
 public:
   RGWFileHandle* rgw_fh;
-  uint64_t* offset;
+  RGWFileHandle::readdir_offset offset;
   void* cb_arg;
   rgw_readdir_cb rcb;
+  uint64_t* ioff;
   size_t ix;
   uint32_t d_count;
 
   RGWReaddirRequest(CephContext* _cct, RGWUserInfo *_user,
                    RGWFileHandle* _rgw_fh, rgw_readdir_cb _rcb,
-                   void* _cb_arg, uint64_t* _offset)
+                   void* _cb_arg, RGWFileHandle::readdir_offset& _offset)
     : RGWLibRequest(_cct, _user), rgw_fh(_rgw_fh), offset(_offset),
-      cb_arg(_cb_arg), rcb(_rcb), ix(0), d_count(0) {
-    const auto& mk = rgw_fh->find_marker(*offset);
-    if (mk) {
-      marker = *mk;
+      cb_arg(_cb_arg), rcb(_rcb), ioff(nullptr), ix(0), d_count(0) {
+
+    using boost::get;
+
+    if (unlikely(!! get<uint64_t*>(&offset))) {
+      ioff = get<uint64_t*>(offset);
+      const auto& mk = rgw_fh->find_marker(*ioff);
+      if (mk) {
+       marker = *mk;
+      }
+    } else {
+      const char* mk = get<const char*>(offset);
+      if (mk) {
+       std::string tmark{rgw_fh->relative_object_name()};
+       tmark += "/";
+       tmark += mk;    
+       marker = rgw_obj_key{std::move(tmark), "", ""};
+      }
     }
+
     default_max = 1000; // XXX was being omitted
     op = this;
   }
 
-  bool only_bucket() override { return false; }
+  bool only_bucket() override { return true; }
 
   int op_init() override {
     // assign store, s, and dialect_handler
@@ -1330,7 +1436,9 @@ public:
 
     /* hash offset of name in parent (short name) for NFS readdir cookie */
     uint64_t off = XXH64(name.data(), name.length(), fh_key::seed);
-    *offset = off;
+    if (unlikely(!! ioff)) {
+      *ioff = off;
+    }
     /* update traversal cache */
     rgw_fh->add_marker(off, marker, type);
     ++d_count;
@@ -1420,10 +1528,16 @@ public:
   }
 
   bool eof() {
-    lsubdout(cct, rgw, 15) << "READDIR offset: " << *offset
-                          << " next marker: " << next_marker
-                          << " is_truncated: " << is_truncated
-                          << dendl;
+    if (unlikely(cct->_conf->subsys.should_gather(ceph_subsys_rgw, 15))) {
+      bool is_offset =
+       unlikely(! get<const char*>(&offset)) ||
+       !! get<const char*>(offset);
+      lsubdout(cct, rgw, 15) << "READDIR offset: " <<
+       ((is_offset) ? offset : "(nil)")
+                            << " next marker: " << next_marker
+                            << " is_truncated: " << is_truncated
+                            << dendl;
+    }
     return !is_truncated;
   }
 
@@ -1449,7 +1563,7 @@ public:
     op = this;
   }
 
-  bool only_bucket() override { return false; }
+  bool only_bucket() override { return true; }
 
   int op_init() override {
     // assign store, s, and dialect_handler
@@ -1520,11 +1634,11 @@ class RGWCreateBucketRequest : public RGWLibRequest,
                               public RGWCreateBucket /* RGWOp */
 {
 public:
-  std::string& uri;
+  const std::string& bucket_name;
 
   RGWCreateBucketRequest(CephContext* _cct, RGWUserInfo *_user,
-                       std::string& _uri)
-    : RGWLibRequest(_cct, _user), uri(_uri) {
+                       std::string& _bname)
+    : RGWLibRequest(_cct, _user), bucket_name(_bname) {
     op = this;
   }
 
@@ -1552,6 +1666,7 @@ public:
     s->info.method = "PUT";
     s->op = OP_PUT;
 
+    string uri = "/" + bucket_name;
     /* XXX derp derp derp */
     s->relative_uri = uri;
     s->info.request_uri = uri; // XXX
@@ -1587,11 +1702,11 @@ class RGWDeleteBucketRequest : public RGWLibRequest,
                               public RGWDeleteBucket /* RGWOp */
 {
 public:
-  std::string& uri;
+  const std::string& bucket_name;
 
   RGWDeleteBucketRequest(CephContext* _cct, RGWUserInfo *_user,
-                       std::string& _uri)
-    : RGWLibRequest(_cct, _user), uri(_uri) {
+                       std::string& _bname)
+    : RGWLibRequest(_cct, _user), bucket_name(_bname) {
     op = this;
   }
 
@@ -1614,6 +1729,7 @@ public:
     s->info.method = "DELETE";
     s->op = OP_DELETE;
 
+    string uri = "/" + bucket_name;
     /* XXX derp derp derp */
     s->relative_uri = uri;
     s->info.request_uri = uri; // XXX
@@ -1975,10 +2091,12 @@ class RGWStatBucketRequest : public RGWLibRequest,
 public:
   std::string uri;
   std::map<std::string, buffer::list> attrs;
+  RGWLibFS::BucketStats& bs;
 
   RGWStatBucketRequest(CephContext* _cct, RGWUserInfo *_user,
-                      const std::string& _path)
-    : RGWLibRequest(_cct, _user) {
+                      const std::string& _path,
+                      RGWLibFS::BucketStats& _stats)
+    : RGWLibRequest(_cct, _user), bs(_stats) {
     uri = "/" + _path;
     op = this;
   }
@@ -2030,6 +2148,10 @@ public:
 
   void send_response() override {
     bucket.creation_time = get_state()->bucket_info.creation_time;
+    bs.size = bucket.size;
+    bs.size_rounded = bucket.size_rounded;
+    bs.creation_time = bucket.creation_time;
+    bs.num_entries = bucket.count;
     std::swap(attrs, get_state()->bucket_attrs);
   }
 
@@ -2057,7 +2179,7 @@ public:
     op = this;
   }
 
-  bool only_bucket() override { return false; }
+  bool only_bucket() override { return true; }
 
   int op_init() override {
     // assign store, s, and dialect_handler
@@ -2151,7 +2273,10 @@ public:
   const std::string& bucket_name;
   const std::string& obj_name;
   RGWFileHandle* rgw_fh;
-  RGWPutObjProcessor *processor;
+  RGWPutObjProcessor* processor;
+  RGWPutObjDataProcessor* filter;
+  boost::optional<RGWPutObj_Compress> compressor;
+  CompressorRef plugin;
   buffer::list data;
   uint64_t timer_id;
   MD5 hash;
@@ -2163,8 +2288,8 @@ public:
   RGWWriteRequest(CephContext* _cct, RGWUserInfo *_user, RGWFileHandle* _fh,
                  const std::string& _bname, const std::string& _oname)
     : RGWLibContinuedReq(_cct, _user), bucket_name(_bname), obj_name(_oname),
-      rgw_fh(_fh), processor(nullptr), real_ofs(0), bytes_written(0),
-      multipart(false), eio(false) {
+      rgw_fh(_fh), processor(nullptr), filter(nullptr), real_ofs(0),
+      bytes_written(0), multipart(false), eio(false) {
 
     int ret = header_init();
     if (ret == 0) {
@@ -2302,12 +2427,12 @@ public:
 
     src_bucket_name = src_parent->bucket_name();
     // need s->src_bucket_name?
-    src_object.name = src_parent->format_child_name(src_name);
+    src_object.name = src_parent->format_child_name(src_name, false);
     // need s->src_object?
 
     dest_bucket_name = dst_parent->bucket_name();
     // need s->bucket.name?
-    dest_object = dst_parent->format_child_name(dst_name);
+    dest_object = dst_parent->format_child_name(dst_name, false);
     // need s->object_name?
 
     int rc = valid_s3_object_name(dest_object);
@@ -2317,8 +2442,7 @@ public:
     /* XXX and fixup key attr (could optimize w/string ref and
      * dest_object) */
     buffer::list ux_key;
-    std::string key_name{dst_parent->make_key_name(dst_name.c_str())};
-    fh_key fhk = dst_parent->make_fhk(key_name);
+    fh_key fhk = dst_parent->make_fhk(dst_name);
     rgw::encode(fhk, ux_key);
     emplace_attr(RGW_ATTR_UNIX_KEY1, std::move(ux_key));
 
@@ -2404,6 +2528,49 @@ public:
 
 }; /* RGWSetAttrsRequest */
 
+/*
+ * Send request to get the rados cluster stats
+ */
+class RGWGetClusterStatReq : public RGWLibRequest,
+        public RGWGetClusterStat {
+public:
+  struct rados_cluster_stat_t& stats_req;
+  RGWGetClusterStatReq(CephContext* _cct,RGWUserInfo *_user,
+                       rados_cluster_stat_t& _stats):
+  RGWLibRequest(_cct, _user), stats_req(_stats){
+    op = this;
+  }
+
+  int op_init() override {
+    // assign store, s, and dialect_handler
+    RGWObjectCtx* rados_ctx
+      = static_cast<RGWObjectCtx*>(get_state()->obj_ctx);
+    // framework promises to call op_init after parent init
+    assert(rados_ctx);
+    RGWOp::init(rados_ctx->store, get_state(), this);
+    op = this; // assign self as op: REQUIRED
+    return 0;
+  }
+
+  int header_init() override {
+    struct req_state* s = get_state();
+    s->info.method = "GET";
+    s->op = OP_GET;
+    s->user = user;
+    return 0;
+  }
+
+  int get_params() override { return 0; }
+  bool only_bucket() override { return false; }
+  void send_response() override {
+    stats_req.kb = stats_op.kb;
+    stats_req.kb_avail = stats_op.kb_avail;
+    stats_req.kb_used = stats_op.kb_used;
+    stats_req.num_objects = stats_op.num_objects;
+  }
+}; /* RGWGetClusterStatReq */
+
+
 } /* namespace rgw */
 
 #endif /* RGW_FILE_H */