]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/services/svc_sys_obj_cache.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / rgw / services / svc_sys_obj_cache.cc
index 7c2817fed912089be080e1307deae9f21212a8d2..8df76c85b59248dc70a04224c772d1de80d57a40 100644 (file)
@@ -1,3 +1,9 @@
+
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#include "common/admin_socket.h"
+
 #include "svc_sys_obj_cache.h"
 #include "svc_zone.h"
 #include "svc_notify.h"
@@ -26,7 +32,12 @@ public:
 
 int RGWSI_SysObj_Cache::do_start()
 {
-  int r = RGWSI_SysObj_Core::do_start();
+  int r = asocket.start();
+  if (r < 0) {
+    return r;
+  }
+
+  r = RGWSI_SysObj_Core::do_start();
   if (r < 0) {
     return r;
   }
@@ -45,6 +56,12 @@ int RGWSI_SysObj_Cache::do_start()
   return 0;
 }
 
+void RGWSI_SysObj_Cache::shutdown()
+{
+  asocket.shutdown();
+  RGWSI_SysObj_Core::shutdown();
+}
+
 static string normal_name(rgw_pool& pool, const std::string& oid) {
   std::string buf;
   buf.reserve(pool.name.size() + pool.ns.size() + oid.size() + 2);
@@ -66,7 +83,8 @@ void RGWSI_SysObj_Cache::normalize_pool_and_obj(const rgw_pool& src_pool, const
 
 int RGWSI_SysObj_Cache::remove(RGWSysObjectCtxBase& obj_ctx,
                                RGWObjVersionTracker *objv_tracker,
-                               const rgw_raw_obj& obj)
+                               const rgw_raw_obj& obj,
+                               optional_yield y)
 
 {
   rgw_pool pool;
@@ -77,30 +95,31 @@ int RGWSI_SysObj_Cache::remove(RGWSysObjectCtxBase& obj_ctx,
   cache.remove(name);
 
   ObjectCacheInfo info;
-  int r = distribute_cache(name, obj, info, REMOVE_OBJ);
+  int r = distribute_cache(name, obj, info, REMOVE_OBJ, y);
   if (r < 0) {
     ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to distribute cache: r=" << r << dendl;
   }
 
-  return RGWSI_SysObj_Core::remove(obj_ctx, objv_tracker, obj);
+  return RGWSI_SysObj_Core::remove(obj_ctx, objv_tracker, obj, y);
 }
 
 int RGWSI_SysObj_Cache::read(RGWSysObjectCtxBase& obj_ctx,
-                             GetObjState& read_state,
+                             RGWSI_SysObj_Obj_GetObjState& read_state,
                              RGWObjVersionTracker *objv_tracker,
                              const rgw_raw_obj& obj,
                              bufferlist *obl, off_t ofs, off_t end,
                              map<string, bufferlist> *attrs,
                             bool raw_attrs,
                              rgw_cache_entry_info *cache_info,
-                             boost::optional<obj_version> refresh_version)
+                             boost::optional<obj_version> refresh_version,
+                             optional_yield y)
 {
   rgw_pool pool;
   string oid;
   if (ofs != 0) {
     return RGWSI_SysObj_Core::read(obj_ctx, read_state, objv_tracker,
-                          obj, obl, ofs, end, attrs, raw_attrs,
-                          cache_info, refresh_version);
+                                   obj, obl, ofs, end, attrs, raw_attrs,
+                                   cache_info, refresh_version, y);
   }
 
   normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
@@ -144,7 +163,7 @@ int RGWSI_SysObj_Cache::read(RGWSysObjectCtxBase& obj_ctx,
                         (attrs ? &unfiltered_attrset : nullptr),
                         true, /* cache unfiltered attrs */
                         cache_info,
-                         refresh_version);
+                         refresh_version, y);
   if (r < 0) {
     if (r == -ENOENT) { // only update ENOENT, we'd rather retry other errors
       info.status = r;
@@ -182,8 +201,9 @@ int RGWSI_SysObj_Cache::read(RGWSysObjectCtxBase& obj_ctx,
 }
 
 int RGWSI_SysObj_Cache::get_attr(const rgw_raw_obj& obj,
-                                const char *attr_name,
-                                bufferlist *dest)
+                                 const char *attr_name,
+                                 bufferlist *dest,
+                                 optional_yield y)
 {
   rgw_pool pool;
   string oid;
@@ -208,13 +228,14 @@ int RGWSI_SysObj_Cache::get_attr(const rgw_raw_obj& obj,
     return dest->length();
   }
   /* don't try to cache this one */
-  return RGWSI_SysObj_Core::get_attr(obj, attr_name, dest);
+  return RGWSI_SysObj_Core::get_attr(obj, attr_name, dest, y);
 }
 
 int RGWSI_SysObj_Cache::set_attrs(const rgw_raw_obj& obj, 
                                   map<string, bufferlist>& attrs,
                                   map<string, bufferlist> *rmattrs,
-                                  RGWObjVersionTracker *objv_tracker) 
+                                  RGWObjVersionTracker *objv_tracker,
+                                  optional_yield y)
 {
   rgw_pool pool;
   string oid;
@@ -230,11 +251,11 @@ int RGWSI_SysObj_Cache::set_attrs(const rgw_raw_obj& obj,
     info.version = objv_tracker->write_version;
     info.flags |= CACHE_FLAG_OBJV;
   }
-  int ret = RGWSI_SysObj_Core::set_attrs(obj, attrs, rmattrs, objv_tracker);
+  int ret = RGWSI_SysObj_Core::set_attrs(obj, attrs, rmattrs, objv_tracker, y);
   string name = normal_name(pool, oid);
   if (ret >= 0) {
     cache.put(name, info, NULL);
-    int r = distribute_cache(name, obj, info, UPDATE_OBJ);
+    int r = distribute_cache(name, obj, info, UPDATE_OBJ, y);
     if (r < 0)
       ldout(cct, 0) << "ERROR: failed to distribute cache for " << obj << dendl;
   } else {
@@ -250,7 +271,8 @@ int RGWSI_SysObj_Cache::write(const rgw_raw_obj& obj,
                              bool exclusive,
                              const bufferlist& data,
                              RGWObjVersionTracker *objv_tracker,
-                             real_time set_mtime)
+                             real_time set_mtime,
+                             optional_yield y)
 {
   rgw_pool pool;
   string oid;
@@ -266,8 +288,8 @@ int RGWSI_SysObj_Cache::write(const rgw_raw_obj& obj,
   }
   ceph::real_time result_mtime;
   int ret = RGWSI_SysObj_Core::write(obj, &result_mtime, attrs,
-                            exclusive, data,
-                            objv_tracker, set_mtime);
+                                     exclusive, data,
+                                     objv_tracker, set_mtime, y);
   if (pmtime) {
     *pmtime = result_mtime;
   }
@@ -284,7 +306,7 @@ int RGWSI_SysObj_Cache::write(const rgw_raw_obj& obj,
     // will need that system object in the near-term and b) it
     // generates additional network traffic.
     if (!exclusive) {
-      int r = distribute_cache(name, obj, info, UPDATE_OBJ);
+      int r = distribute_cache(name, obj, info, UPDATE_OBJ, y);
       if (r < 0)
        ldout(cct, 0) << "ERROR: failed to distribute cache for " << obj << dendl;
     }
@@ -298,7 +320,8 @@ int RGWSI_SysObj_Cache::write(const rgw_raw_obj& obj,
 int RGWSI_SysObj_Cache::write_data(const rgw_raw_obj& obj,
                                    const bufferlist& data,
                                    bool exclusive,
-                                   RGWObjVersionTracker *objv_tracker)
+                                   RGWObjVersionTracker *objv_tracker,
+                                   optional_yield y)
 {
   rgw_pool pool;
   string oid;
@@ -314,11 +337,11 @@ int RGWSI_SysObj_Cache::write_data(const rgw_raw_obj& obj,
     info.version = objv_tracker->write_version;
     info.flags |= CACHE_FLAG_OBJV;
   }
-  int ret = RGWSI_SysObj_Core::write_data(obj, data, exclusive, objv_tracker);
+  int ret = RGWSI_SysObj_Core::write_data(obj, data, exclusive, objv_tracker, y);
   string name = normal_name(pool, oid);
   if (ret >= 0) {
     cache.put(name, info, NULL);
-    int r = distribute_cache(name, obj, info, UPDATE_OBJ);
+    int r = distribute_cache(name, obj, info, UPDATE_OBJ, y);
     if (r < 0)
       ldout(cct, 0) << "ERROR: failed to distribute cache for " << obj << dendl;
   } else {
@@ -330,7 +353,8 @@ int RGWSI_SysObj_Cache::write_data(const rgw_raw_obj& obj,
 
 int RGWSI_SysObj_Cache::raw_stat(const rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime, uint64_t *pepoch,
                                  map<string, bufferlist> *attrs, bufferlist *first_chunk,
-                                 RGWObjVersionTracker *objv_tracker)
+                                 RGWObjVersionTracker *objv_tracker,
+                                 optional_yield y)
 {
   rgw_pool pool;
   string oid;
@@ -358,7 +382,8 @@ int RGWSI_SysObj_Cache::raw_stat(const rgw_raw_obj& obj, uint64_t *psize, real_t
       objv_tracker->read_version = info.version;
     goto done;
   }
-  r = RGWSI_SysObj_Core::raw_stat(obj, &size, &mtime, &epoch, &info.xattrs, first_chunk, objv_tracker);
+  r = RGWSI_SysObj_Core::raw_stat(obj, &size, &mtime, &epoch, &info.xattrs,
+                                  first_chunk, objv_tracker, y);
   if (r < 0) {
     if (r == -ENOENT) {
       info.status = r;
@@ -388,17 +413,18 @@ done:
   return 0;
 }
 
-int RGWSI_SysObj_Cache::distribute_cache(const string& normal_name, const rgw_raw_obj& obj, ObjectCacheInfo& obj_info, int op)
+int RGWSI_SysObj_Cache::distribute_cache(const string& normal_name,
+                                         const rgw_raw_obj& obj,
+                                         ObjectCacheInfo& obj_info, int op,
+                                         optional_yield y)
 {
   RGWCacheNotifyInfo info;
-
   info.op = op;
-
   info.obj_info = obj_info;
   info.obj = obj;
   bufferlist bl;
   encode(info, bl);
-  return notify_svc->distribute(normal_name, bl);
+  return notify_svc->distribute(normal_name, bl, y);
 }
 
 int RGWSI_SysObj_Cache::watch_cb(uint64_t notify_id,
@@ -470,10 +496,114 @@ static void cache_list_dump_helper(Formatter* f,
   f->dump_unsigned("size", size);
 }
 
-void RGWSI_SysObj_Cache::call_list(const std::optional<std::string>& filter, Formatter* f)
+class RGWSI_SysObj_Cache_ASocketHook : public AdminSocketHook {
+  RGWSI_SysObj_Cache *svc;
+
+  static constexpr const char* admin_commands[4][3] = {
+    { "cache list",
+      "cache list name=filter,type=CephString,req=false",
+      "cache list [filter_str]: list object cache, possibly matching substrings" },
+    { "cache inspect name=target,type=CephString,req=true",
+      "cache inspect target: print cache element" },
+    { "cache erase name=target,type=CephString,req=true",
+      "cache erase target: erase element from cache" },
+    { "cache zap",
+      "cache zap: erase all elements from cache" }
+  };
+
+public:
+    RGWSI_SysObj_Cache_ASocketHook(RGWSI_SysObj_Cache *_svc) : svc(_svc) {}
+
+    int start();
+    void shutdown();
+
+    int call(std::string_view command, const cmdmap_t& cmdmap,
+            Formatter *f,
+            std::ostream& ss,
+            bufferlist& out) override;
+};
+
+int RGWSI_SysObj_Cache_ASocketHook::start()
+{
+  auto admin_socket = svc->ctx()->get_admin_socket();
+  for (auto cmd : admin_commands) {
+    int r = admin_socket->register_command(cmd[0], this, cmd[1]);
+    if (r < 0) {
+      ldout(svc->ctx(), 0) << "ERROR: fail to register admin socket command (r=" << r
+                           << ")" << dendl;
+      return r;
+    }
+  }
+  return 0;
+}
+
+void RGWSI_SysObj_Cache_ASocketHook::shutdown()
+{
+  auto admin_socket = svc->ctx()->get_admin_socket();
+  admin_socket->unregister_commands(this);
+}
+
+int RGWSI_SysObj_Cache_ASocketHook::call(
+  std::string_view command, const cmdmap_t& cmdmap,
+  Formatter *f,
+  std::ostream& ss,
+  bufferlist& out)
+{
+  if (command == "cache list"sv) {
+    std::optional<std::string> filter;
+    if (auto i = cmdmap.find("filter"); i != cmdmap.cend()) {
+      filter = boost::get<std::string>(i->second);
+    }
+    f->open_array_section("cache_entries");
+    svc->asocket.call_list(filter, f);
+    f->close_section();
+    return 0;
+  } else if (command == "cache inspect"sv) {
+    const auto& target = boost::get<std::string>(cmdmap.at("target"));
+    if (svc->asocket.call_inspect(target, f)) {
+      return 0;
+    } else {
+      ss << "Unable to find entry "s + target + ".\n";
+      return -ENOENT;
+    }
+  } else if (command == "cache erase"sv) {
+    const auto& target = boost::get<std::string>(cmdmap.at("target"));
+    if (svc->asocket.call_erase(target)) {
+      return 0;
+    } else {
+      ss << "Unable to find entry "s + target + ".\n";
+      return -ENOENT;
+    }
+  } else if (command == "cache zap"sv) {
+    svc->asocket.call_zap();
+    return 0;
+  }
+  return -ENOSYS;
+}
+
+RGWSI_SysObj_Cache::ASocketHandler::ASocketHandler(RGWSI_SysObj_Cache *_svc) : svc(_svc)
+{
+  hook.reset(new RGWSI_SysObj_Cache_ASocketHook(_svc));
+}
+
+RGWSI_SysObj_Cache::ASocketHandler::~ASocketHandler()
+{
+}
+
+int RGWSI_SysObj_Cache::ASocketHandler::start()
+{
+  return hook->start();
+}
+
+void RGWSI_SysObj_Cache::ASocketHandler::shutdown()
+{
+  return hook->shutdown();
+}
+
+void RGWSI_SysObj_Cache::ASocketHandler::call_list(const std::optional<std::string>& filter, Formatter* f)
 {
-  cache.for_each(
-    [this, &filter, f] (const string& name, const ObjectCacheEntry& entry) {
+  svc->cache.for_each(
+    [&filter, f] (const string& name, const ObjectCacheEntry& entry) {
       if (!filter || name.find(*filter) != name.npos) {
        cache_list_dump_helper(f, name, entry.info.meta.mtime,
                                entry.info.meta.size);
@@ -481,9 +611,9 @@ void RGWSI_SysObj_Cache::call_list(const std::optional<std::string>& filter, For
     });
 }
 
-int RGWSI_SysObj_Cache::call_inspect(const std::string& target, Formatter* f)
+int RGWSI_SysObj_Cache::ASocketHandler::call_inspect(const std::string& target, Formatter* f)
 {
-  if (const auto entry = cache.get(target)) {
+  if (const auto entry = svc->cache.get(target)) {
     f->open_object_section("cache_entry");
     f->dump_string("name", target.c_str());
     entry->dump(f);
@@ -494,13 +624,13 @@ int RGWSI_SysObj_Cache::call_inspect(const std::string& target, Formatter* f)
   }
 }
 
-int RGWSI_SysObj_Cache::call_erase(const std::string& target)
+int RGWSI_SysObj_Cache::ASocketHandler::call_erase(const std::string& target)
 {
-  return cache.remove(target);
+  return svc->cache.remove(target);
 }
 
-int RGWSI_SysObj_Cache::call_zap()
+int RGWSI_SysObj_Cache::ASocketHandler::call_zap()
 {
-  cache.invalidate_all();
+  svc->cache.invalidate_all();
   return 0;
 }