]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_file.h
update sources to 12.2.7
[ceph.git] / ceph / src / rgw / rgw_file.h
index c7b46c6fba162732e584dcf8bb45d104bfdcb06e..bfb95be737abe29216d666778c805f9fce0765ee 100644 (file)
@@ -34,6 +34,7 @@
 #include "rgw_lib.h"
 #include "rgw_ldap.h"
 #include "rgw_token.h"
+#include "rgw_compression.h"
 
 
 /* XXX
@@ -91,42 +92,50 @@ namespace rgw {
   struct fh_key
   {
     rgw_fh_hk fh_hk;
+    uint32_t version;
 
     static constexpr uint64_t seed = 8675309;
 
-    fh_key() {}
+    fh_key() : version(0) {}
 
     fh_key(const rgw_fh_hk& _hk)
-      : fh_hk(_hk) {
+      : fh_hk(_hk), version(0) {
       // nothing
     }
 
-    fh_key(const uint64_t bk, const uint64_t ok) {
+    fh_key(const uint64_t bk, const uint64_t ok)
+      : version(0) {
       fh_hk.bucket = bk;
       fh_hk.object = ok;
     }
 
-    fh_key(const uint64_t bk, const char *_o) {
+    fh_key(const uint64_t bk, const char *_o)
+      : version(0) {
       fh_hk.bucket = bk;
       fh_hk.object = XXH64(_o, ::strlen(_o), seed);
     }
     
-    fh_key(const std::string& _b, const std::string& _o) {
+    fh_key(const std::string& _b, const std::string& _o)
+      : version(0) {
       fh_hk.bucket = XXH64(_b.c_str(), _o.length(), seed);
       fh_hk.object = XXH64(_o.c_str(), _o.length(), seed);
     }
 
     void encode(buffer::list& bl) const {
-      ENCODE_START(1, 1, bl);
+      ENCODE_START(2, 1, bl);
       ::encode(fh_hk.bucket, bl);
       ::encode(fh_hk.object, bl);
+      ::encode((uint32_t)2, bl);
       ENCODE_FINISH(bl);
     }
 
     void decode(bufferlist::iterator& bl) {
-      DECODE_START(1, bl);
+      DECODE_START(2, bl);
       ::decode(fh_hk.bucket, bl);
       ::decode(fh_hk.object, bl);
+      if (struct_v >= 2) {
+       ::decode(version, bl);
+      }
       DECODE_FINISH(bl);
     }
   }; /* fh_key */
@@ -164,6 +173,8 @@ namespace rgw {
   using boost::variant;
   using boost::container::flat_map;
 
+  typedef std::tuple<bool, bool> DecodeAttrsResult;
+
   class RGWFileHandle : public cohort::lru::Object
   {
     struct rgw_file_handle fh;
@@ -195,8 +206,9 @@ namespace rgw {
       struct timespec ctime;
       struct timespec mtime;
       struct timespec atime;
+      uint32_t version;
       State() : dev(0), size(0), nlink(1), owner_uid(0), owner_gid(0),
-               ctime{0,0}, mtime{0,0}, atime{0,0} {}
+               ctime{0,0}, mtime{0,0}, atime{0,0}, version(0) {}
     } state;
 
     struct file {
@@ -241,6 +253,7 @@ namespace rgw {
     static constexpr uint32_t FLAG_LOCKED = 0x0200;
     static constexpr uint32_t FLAG_STATELESS_OPEN = 0x0400;
     static constexpr uint32_t FLAG_EXACT_MATCH = 0x0800;
+    static constexpr uint32_t FLAG_MOUNT = 0x1000;
 
 #define CREATE_FLAGS(x) \
     ((x) & ~(RGWFileHandle::FLAG_CREATE|RGWFileHandle::FLAG_LOCK))
@@ -248,33 +261,47 @@ namespace rgw {
     friend class RGWLibFS;
 
   private:
-    RGWFileHandle(RGWLibFS* _fs, uint32_t fs_inst)
+    RGWFileHandle(RGWLibFS* _fs)
       : fs(_fs), bucket(nullptr), parent(nullptr), variant_type{directory()},
-       depth(0), flags(FLAG_ROOT)
+       depth(0), flags(FLAG_NONE)
       {
        /* root */
        fh.fh_type = RGW_FS_TYPE_DIRECTORY;
        variant_type = directory();
        /* stat */
-       state.dev = fs_inst;
        state.unix_mode = RGW_RWXMODE|S_IFDIR;
        /* pointer to self */
        fh.fh_private = this;
       }
 
-    void init_rootfs(std::string& fsid, const std::string& object_name) {
+    uint64_t init_fsid(std::string& uid) {
+      return XXH64(uid.c_str(), uid.length(), fh_key::seed);
+    }
+
+    void init_rootfs(std::string& fsid, const std::string& object_name,
+                     bool is_bucket) {
       /* fh_key */
       fh.fh_hk.bucket = XXH64(fsid.c_str(), fsid.length(), fh_key::seed);
       fh.fh_hk.object = XXH64(object_name.c_str(), object_name.length(),
                              fh_key::seed);
       fhk = fh.fh_hk;
       name = object_name;
+
+      state.dev = init_fsid(fsid);
+
+      if (is_bucket) {
+        flags |= RGWFileHandle::FLAG_BUCKET | RGWFileHandle::FLAG_MOUNT;
+        bucket = this;
+        depth = 1;
+      } else {
+        flags |= RGWFileHandle::FLAG_ROOT | RGWFileHandle::FLAG_MOUNT;
+      }
     }
 
   public:
-    RGWFileHandle(RGWLibFS* fs, uint32_t fs_inst, RGWFileHandle* _parent,
+    RGWFileHandle(RGWLibFS* _fs, RGWFileHandle* _parent,
                  const fh_key& _fhk, std::string& _name, uint32_t _flags)
-      : fs(fs), bucket(nullptr), parent(_parent), name(std::move(_name)),
+      : fs(_fs), bucket(nullptr), parent(_parent), name(std::move(_name)),
        fhk(_fhk), flags(_flags) {
 
       if (parent->is_root()) {
@@ -298,8 +325,8 @@ namespace rgw {
       /* save constant fhk */
       fh.fh_hk = fhk.fh_hk; /* XXX redundant in fh_hk */
 
-      /* stat */
-      state.dev = fs_inst;
+      /* inherits parent's fsid */
+      state.dev = parent->state.dev;
 
       switch (fh.fh_type) {
       case RGW_FS_TYPE_DIRECTORY:
@@ -496,8 +523,17 @@ namespace rgw {
       return nullptr;
     }
 
+    int offset_of(const std::string& name, int64_t *offset, uint32_t flags) {
+      if (unlikely(! is_dir())) {
+       return -EINVAL;
+      }
+      *offset = XXH64(name.c_str(), name.length(), fh_key::seed);
+      return 0;
+    }
+
     bool is_open() const { return flags & FLAG_OPEN; }
     bool is_root() const { return flags & FLAG_ROOT; }
+    bool is_mount() const { return flags & FLAG_MOUNT; }
     bool is_bucket() const { return flags & FLAG_BUCKET; }
     bool is_object() const { return !is_bucket(); }
     bool is_file() const { return (fh.fh_type == RGW_FS_TYPE_FILE); }
@@ -519,8 +555,11 @@ namespace rgw {
       return -EPERM;
     }
 
-    int readdir(rgw_readdir_cb rcb, void *cb_arg, uint64_t *offset, bool *eof,
-               uint32_t flags);
+    typedef boost::variant<uint64_t*, const char*> readdir_offset;
+
+    int readdir(rgw_readdir_cb rcb, void *cb_arg, readdir_offset offset,
+               bool *eof, uint32_t flags);
+
     int write(uint64_t off, size_t len, size_t *nbytes, void *buffer);
 
     int commit(uint64_t offset, uint64_t length, uint32_t flags) {
@@ -577,7 +616,7 @@ namespace rgw {
     }
 
     void encode(buffer::list& bl) const {
-      ENCODE_START(1, 1, bl);
+      ENCODE_START(2, 1, bl);
       ::encode(uint32_t(fh.fh_type), bl);
       ::encode(state.dev, bl);
       ::encode(state.size, bl);
@@ -588,11 +627,12 @@ namespace rgw {
       for (const auto& t : { state.ctime, state.mtime, state.atime }) {
        ::encode(real_clock::from_timespec(t), bl);
       }
+      ::encode((uint32_t)2, bl);
       ENCODE_FINISH(bl);
     }
 
     void decode(bufferlist::iterator& bl) {
-      DECODE_START(1, bl);
+      DECODE_START(2, bl);
       uint32_t fh_type;
       ::decode(fh_type, bl);
       assert(fh.fh_type == fh_type);
@@ -607,14 +647,17 @@ namespace rgw {
        ::decode(enc_time, bl);
        *t = real_clock::to_timespec(enc_time);
       }
+      if (struct_v >= 2) {
+        ::decode(state.version, bl);
+      }
       DECODE_FINISH(bl);
     }
 
     void encode_attrs(ceph::buffer::list& ux_key1,
                      ceph::buffer::list& ux_attrs1);
 
-    void decode_attrs(const ceph::buffer::list* ux_key1,
-                     const ceph::buffer::list* ux_attrs1);
+    DecodeAttrsResult decode_attrs(const ceph::buffer::list* ux_key1,
+                                   const ceph::buffer::list* ux_attrs1);
 
     void invalidate();
 
@@ -677,7 +720,6 @@ namespace rgw {
     {
     public:
       RGWLibFS* fs;
-      uint32_t fs_inst;
       RGWFileHandle* parent;
       const fh_key& fhk;
       std::string& name;
@@ -685,20 +727,20 @@ namespace rgw {
 
       Factory() = delete;
 
-      Factory(RGWLibFS* fs, uint32_t fs_inst, RGWFileHandle* parent,
-             const fh_key& fhk, std::string& name, uint32_t flags)
-       : fs(fs), fs_inst(fs_inst), parent(parent), fhk(fhk), name(name),
-         flags(flags) {}
+      Factory(RGWLibFS* _fs, RGWFileHandle* _parent,
+             const fh_key& _fhk, std::string& _name, uint32_t _flags)
+       : fs(_fs), parent(_parent), fhk(_fhk), name(_name),
+         flags(_flags) {}
 
       void recycle (cohort::lru::Object* o) override {
        /* re-use an existing object */
        o->~Object(); // call lru::Object virtual dtor
        // placement new!
-       new (o) RGWFileHandle(fs, fs_inst, parent, fhk, name, flags);
+       new (o) RGWFileHandle(fs, parent, fhk, name, flags);
       }
 
       cohort::lru::Object* alloc() override {
-       return new RGWFileHandle(fs, fs_inst, parent, fhk, name, flags);
+       return new RGWFileHandle(fs, parent, fhk, name, flags);
       }
     }; /* Factory */
 
@@ -732,7 +774,7 @@ namespace rgw {
   class RGWLibFS
   {
     CephContext* cct;
-    struct rgw_fs fs;
+    struct rgw_fs fs{};
     RGWFileHandle root_fh;
     rgw_fh_callback_t invalidate_cb;
     void *invalidate_arg;
@@ -751,7 +793,6 @@ namespace rgw {
     static std::atomic<uint32_t> fs_inst_counter;
 
     static uint32_t write_completion_interval_s;
-    std::string fsid;
 
     using lock_guard = std::lock_guard<std::mutex>;
     using unique_lock = std::unique_lock<std::mutex>;
@@ -820,8 +861,8 @@ namespace rgw {
     };
 
     RGWLibFS(CephContext* _cct, const char *_uid, const char *_user_id,
-           const char* _key)
-      : cct(_cct), root_fh(this, new_inst()), invalidate_cb(nullptr),
+           const char* _key, const char *root)
+      : cct(_cct), root_fh(this), invalidate_cb(nullptr),
        invalidate_arg(nullptr), shutdown(false), refcnt(1),
        fh_cache(cct->_conf->rgw_nfs_fhcache_partitions,
                 cct->_conf->rgw_nfs_fhcache_size),
@@ -829,17 +870,19 @@ namespace rgw {
               cct->_conf->rgw_nfs_lru_lane_hiwat),
        uid(_uid), key(_user_id, _key) {
 
-      /* no bucket may be named rgw_fs_inst-(.*) */
-      fsid = RGWFileHandle::root_name + "rgw_fs_inst-" +
-       std::to_string(get_inst());
-
-      root_fh.init_rootfs(fsid /* bucket */, RGWFileHandle::root_name);
+      if (!root || !strcmp(root, "/")) {
+        root_fh.init_rootfs(uid, RGWFileHandle::root_name, false);
+      } else {
+        root_fh.init_rootfs(uid, root, true);
+      }
 
       /* pointer to self */
       fs.fs_private = this;
 
       /* expose public root fh */
       fs.root_fh = root_fh.get_fh();
+
+      new_inst();
     }
 
     friend void intrusive_ptr_add_ref(const RGWLibFS* fs) {
@@ -982,7 +1025,7 @@ namespace rgw {
        << " (" << obj_name << ")"
        << dendl;
 
-      fh_key fhk = parent->make_fhk(key_name);
+      fh_key fhk = parent->make_fhk(obj_name);
 
     retry:
       RGWFileHandle* fh =
@@ -1014,21 +1057,30 @@ namespace rgw {
            fh->mtx.unlock(); /* ! LOCKED */
       } else {
        /* make or re-use handle */
-       RGWFileHandle::Factory prototype(this, get_inst(), parent, fhk,
+       RGWFileHandle::Factory prototype(this, parent, fhk,
                                         obj_name, CREATE_FLAGS(flags));
+       uint32_t iflags{cohort::lru::FLAG_INITIAL};
        fh = static_cast<RGWFileHandle*>(
          fh_lru.insert(&prototype,
                        cohort::lru::Edge::MRU,
-                       cohort::lru::FLAG_INITIAL));
+                       iflags));
        if (fh) {
          /* lock fh (LATCHED) */
          if (flags & RGWFileHandle::FLAG_LOCK)
            fh->mtx.lock();
-         /* inserts, releasing latch */
-         fh_cache.insert_latched(fh, lat, RGWFileHandle::FHCache::FLAG_UNLOCK);
+         if (likely(! (iflags & cohort::lru::FLAG_RECYCLE))) {
+           /* inserts at cached insert iterator, releasing latch */
+           fh_cache.insert_latched(
+             fh, lat, RGWFileHandle::FHCache::FLAG_UNLOCK);
+         } else {
+           /* recycle step invalidates Latch */
+           fh_cache.insert(
+             fhk.fh_hk.object, fh, RGWFileHandle::FHCache::FLAG_NONE);
+           lat.lock->unlock(); /* !LATCHED */
+         }
          get<1>(fhr) |= RGWFileHandle::FLAG_CREATE;
          /* ref parent (non-initial ref cannot fail on valid object) */
-         if (! parent->is_root()) {
+         if (! parent->is_mount()) {
            (void) fh_lru.ref(parent, cohort::lru::FLAG_NONE);
          }
          goto out; /* !LATCHED */
@@ -1049,13 +1101,13 @@ namespace rgw {
     } /*  lookup_fh(RGWFileHandle*, const char *, const uint32_t) */
 
     inline void unref(RGWFileHandle* fh) {
-      if (likely(! fh->is_root())) {
+      if (likely(! fh->is_mount())) {
        (void) fh_lru.unref(fh, cohort::lru::FLAG_NONE);
       }
     }
 
     inline RGWFileHandle* ref(RGWFileHandle* fh) {
-      if (likely(! fh->is_root())) {
+      if (likely(! fh->is_mount())) {
        fh_lru.ref(fh, cohort::lru::FLAG_NONE);
       }
       return fh;
@@ -1066,6 +1118,8 @@ namespace rgw {
     int setattr(RGWFileHandle* rgw_fh, struct stat* st, uint32_t mask,
                uint32_t flags);
 
+    void update_fh(RGWFileHandle *rgw_fh);
+
     LookupFHResult stat_bucket(RGWFileHandle* parent, const char *path,
                               RGWLibFS::BucketStats& bs,
                               uint32_t flags);
@@ -1146,7 +1200,7 @@ namespace rgw {
 
     struct rgw_fs* get_fs() { return &fs; }
 
-    uint32_t get_inst() { return root_fh.state.dev; }
+    uint64_t get_fsid() { return root_fh.state.dev; }
 
     RGWUserInfo* get_user() { return &user; }
 
@@ -1173,20 +1227,32 @@ class RGWListBucketsRequest : public RGWLibRequest,
 {
 public:
   RGWFileHandle* rgw_fh;
-  uint64_t* offset;
+  RGWFileHandle::readdir_offset offset;
   void* cb_arg;
   rgw_readdir_cb rcb;
+  uint64_t* ioff;
   size_t ix;
   uint32_t d_count;
 
   RGWListBucketsRequest(CephContext* _cct, RGWUserInfo *_user,
                        RGWFileHandle* _rgw_fh, rgw_readdir_cb _rcb,
-                       void* _cb_arg, uint64_t* _offset)
+                       void* _cb_arg, RGWFileHandle::readdir_offset& _offset)
     : RGWLibRequest(_cct, _user), rgw_fh(_rgw_fh), offset(_offset),
-      cb_arg(_cb_arg), rcb(_rcb), ix(0), d_count(0) {
-    const auto& mk = rgw_fh->find_marker(*offset);
-    if (mk) {
-      marker = mk->name;
+      cb_arg(_cb_arg), rcb(_rcb), ioff(nullptr), ix(0), d_count(0) {
+
+    using boost::get;
+
+    if (unlikely(!! get<uint64_t*>(&offset))) {
+      ioff = get<uint64_t*>(offset);
+      const auto& mk = rgw_fh->find_marker(*ioff);
+      if (mk) {
+       marker = mk->name;
+      }
+    } else {
+      const char* mk = get<const char*>(offset);
+      if (mk) {
+       marker = mk;
+      }
     }
     op = this;
   }
@@ -1257,7 +1323,9 @@ public:
   int operator()(const boost::string_ref& name,
                 const boost::string_ref& marker) {
     uint64_t off = XXH64(name.data(), name.length(), fh_key::seed);
-    *offset = off;
+    if (!! ioff) {
+      *ioff = off;
+    }
     /* update traversal cache */
     rgw_fh->add_marker(off, rgw_obj_key{marker.data(), ""},
                       RGW_FS_TYPE_DIRECTORY);
@@ -1266,9 +1334,15 @@ public:
   }
 
   bool eof() {
-    lsubdout(cct, rgw, 15) << "READDIR offset: " << *offset
-                          << " is_truncated: " << is_truncated
-                          << dendl;
+    if (unlikely(cct->_conf->subsys.should_gather(ceph_subsys_rgw, 15))) {
+      bool is_offset =
+       unlikely(! get<const char*>(&offset)) ||
+       !! get<const char*>(offset);
+      lsubdout(cct, rgw, 15) << "READDIR offset: " <<
+       ((is_offset) ? offset : "(nil)")
+                            << " is_truncated: " << is_truncated
+                            << dendl;
+    }
     return !is_truncated;
   }
 
@@ -1283,26 +1357,42 @@ class RGWReaddirRequest : public RGWLibRequest,
 {
 public:
   RGWFileHandle* rgw_fh;
-  uint64_t* offset;
+  RGWFileHandle::readdir_offset offset;
   void* cb_arg;
   rgw_readdir_cb rcb;
+  uint64_t* ioff;
   size_t ix;
   uint32_t d_count;
 
   RGWReaddirRequest(CephContext* _cct, RGWUserInfo *_user,
                    RGWFileHandle* _rgw_fh, rgw_readdir_cb _rcb,
-                   void* _cb_arg, uint64_t* _offset)
+                   void* _cb_arg, RGWFileHandle::readdir_offset& _offset)
     : RGWLibRequest(_cct, _user), rgw_fh(_rgw_fh), offset(_offset),
-      cb_arg(_cb_arg), rcb(_rcb), ix(0), d_count(0) {
-    const auto& mk = rgw_fh->find_marker(*offset);
-    if (mk) {
-      marker = *mk;
+      cb_arg(_cb_arg), rcb(_rcb), ioff(nullptr), ix(0), d_count(0) {
+
+    using boost::get;
+
+    if (unlikely(!! get<uint64_t*>(&offset))) {
+      ioff = get<uint64_t*>(offset);
+      const auto& mk = rgw_fh->find_marker(*ioff);
+      if (mk) {
+       marker = *mk;
+      }
+    } else {
+      const char* mk = get<const char*>(offset);
+      if (mk) {
+       std::string tmark{rgw_fh->relative_object_name()};
+       tmark += "/";
+       tmark += mk;    
+       marker = rgw_obj_key{std::move(tmark), "", ""};
+      }
     }
+
     default_max = 1000; // XXX was being omitted
     op = this;
   }
 
-  bool only_bucket() override { return false; }
+  bool only_bucket() override { return true; }
 
   int op_init() override {
     // assign store, s, and dialect_handler
@@ -1346,7 +1436,9 @@ public:
 
     /* hash offset of name in parent (short name) for NFS readdir cookie */
     uint64_t off = XXH64(name.data(), name.length(), fh_key::seed);
-    *offset = off;
+    if (unlikely(!! ioff)) {
+      *ioff = off;
+    }
     /* update traversal cache */
     rgw_fh->add_marker(off, marker, type);
     ++d_count;
@@ -1436,10 +1528,16 @@ public:
   }
 
   bool eof() {
-    lsubdout(cct, rgw, 15) << "READDIR offset: " << *offset
-                          << " next marker: " << next_marker
-                          << " is_truncated: " << is_truncated
-                          << dendl;
+    if (unlikely(cct->_conf->subsys.should_gather(ceph_subsys_rgw, 15))) {
+      bool is_offset =
+       unlikely(! get<const char*>(&offset)) ||
+       !! get<const char*>(offset);
+      lsubdout(cct, rgw, 15) << "READDIR offset: " <<
+       ((is_offset) ? offset : "(nil)")
+                            << " next marker: " << next_marker
+                            << " is_truncated: " << is_truncated
+                            << dendl;
+    }
     return !is_truncated;
   }
 
@@ -1465,7 +1563,7 @@ public:
     op = this;
   }
 
-  bool only_bucket() override { return false; }
+  bool only_bucket() override { return true; }
 
   int op_init() override {
     // assign store, s, and dialect_handler
@@ -2081,7 +2179,7 @@ public:
     op = this;
   }
 
-  bool only_bucket() override { return false; }
+  bool only_bucket() override { return true; }
 
   int op_init() override {
     // assign store, s, and dialect_handler
@@ -2175,7 +2273,10 @@ public:
   const std::string& bucket_name;
   const std::string& obj_name;
   RGWFileHandle* rgw_fh;
-  RGWPutObjProcessor *processor;
+  RGWPutObjProcessor* processor;
+  RGWPutObjDataProcessor* filter;
+  boost::optional<RGWPutObj_Compress> compressor;
+  CompressorRef plugin;
   buffer::list data;
   uint64_t timer_id;
   MD5 hash;
@@ -2187,8 +2288,8 @@ public:
   RGWWriteRequest(CephContext* _cct, RGWUserInfo *_user, RGWFileHandle* _fh,
                  const std::string& _bname, const std::string& _oname)
     : RGWLibContinuedReq(_cct, _user), bucket_name(_bname), obj_name(_oname),
-      rgw_fh(_fh), processor(nullptr), real_ofs(0), bytes_written(0),
-      multipart(false), eio(false) {
+      rgw_fh(_fh), processor(nullptr), filter(nullptr), real_ofs(0),
+      bytes_written(0), multipart(false), eio(false) {
 
     int ret = header_init();
     if (ret == 0) {
@@ -2341,8 +2442,7 @@ public:
     /* XXX and fixup key attr (could optimize w/string ref and
      * dest_object) */
     buffer::list ux_key;
-    std::string key_name{dst_parent->make_key_name(dst_name.c_str())};
-    fh_key fhk = dst_parent->make_fhk(key_name);
+    fh_key fhk = dst_parent->make_fhk(dst_name);
     rgw::encode(fhk, ux_key);
     emplace_attr(RGW_ATTR_UNIX_KEY1, std::move(ux_key));
 
@@ -2428,6 +2528,49 @@ public:
 
 }; /* RGWSetAttrsRequest */
 
+/*
+ * Send request to get the rados cluster stats
+ */
+class RGWGetClusterStatReq : public RGWLibRequest,
+        public RGWGetClusterStat {
+public:
+  struct rados_cluster_stat_t& stats_req;
+  RGWGetClusterStatReq(CephContext* _cct,RGWUserInfo *_user,
+                       rados_cluster_stat_t& _stats):
+  RGWLibRequest(_cct, _user), stats_req(_stats){
+    op = this;
+  }
+
+  int op_init() override {
+    // assign store, s, and dialect_handler
+    RGWObjectCtx* rados_ctx
+      = static_cast<RGWObjectCtx*>(get_state()->obj_ctx);
+    // framework promises to call op_init after parent init
+    assert(rados_ctx);
+    RGWOp::init(rados_ctx->store, get_state(), this);
+    op = this; // assign self as op: REQUIRED
+    return 0;
+  }
+
+  int header_init() override {
+    struct req_state* s = get_state();
+    s->info.method = "GET";
+    s->op = OP_GET;
+    s->user = user;
+    return 0;
+  }
+
+  int get_params() override { return 0; }
+  bool only_bucket() override { return false; }
+  void send_response() override {
+    stats_req.kb = stats_op.kb;
+    stats_req.kb_avail = stats_op.kb_avail;
+    stats_req.kb_used = stats_op.kb_used;
+    stats_req.num_objects = stats_op.num_objects;
+  }
+}; /* RGWGetClusterStatReq */
+
+
 } /* namespace rgw */
 
 #endif /* RGW_FILE_H */