+
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#include "common/admin_socket.h"
+
#include "svc_sys_obj_cache.h"
#include "svc_zone.h"
#include "svc_notify.h"
int RGWSI_SysObj_Cache::do_start()
{
- int r = RGWSI_SysObj_Core::do_start();
+ int r = asocket.start();
+ if (r < 0) {
+ return r;
+ }
+
+ r = RGWSI_SysObj_Core::do_start();
if (r < 0) {
return r;
}
return 0;
}
+void RGWSI_SysObj_Cache::shutdown()
+{
+ asocket.shutdown();
+ RGWSI_SysObj_Core::shutdown();
+}
+
static string normal_name(rgw_pool& pool, const std::string& oid) {
std::string buf;
buf.reserve(pool.name.size() + pool.ns.size() + oid.size() + 2);
int RGWSI_SysObj_Cache::remove(RGWSysObjectCtxBase& obj_ctx,
RGWObjVersionTracker *objv_tracker,
- const rgw_raw_obj& obj)
+ const rgw_raw_obj& obj,
+ optional_yield y)
{
rgw_pool pool;
cache.remove(name);
ObjectCacheInfo info;
- int r = distribute_cache(name, obj, info, REMOVE_OBJ);
+ int r = distribute_cache(name, obj, info, REMOVE_OBJ, y);
if (r < 0) {
ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to distribute cache: r=" << r << dendl;
}
- return RGWSI_SysObj_Core::remove(obj_ctx, objv_tracker, obj);
+ return RGWSI_SysObj_Core::remove(obj_ctx, objv_tracker, obj, y);
}
int RGWSI_SysObj_Cache::read(RGWSysObjectCtxBase& obj_ctx,
- GetObjState& read_state,
+ RGWSI_SysObj_Obj_GetObjState& read_state,
RGWObjVersionTracker *objv_tracker,
const rgw_raw_obj& obj,
bufferlist *obl, off_t ofs, off_t end,
map<string, bufferlist> *attrs,
bool raw_attrs,
rgw_cache_entry_info *cache_info,
- boost::optional<obj_version> refresh_version)
+ boost::optional<obj_version> refresh_version,
+ optional_yield y)
{
rgw_pool pool;
string oid;
if (ofs != 0) {
return RGWSI_SysObj_Core::read(obj_ctx, read_state, objv_tracker,
- obj, obl, ofs, end, attrs, raw_attrs,
- cache_info, refresh_version);
+ obj, obl, ofs, end, attrs, raw_attrs,
+ cache_info, refresh_version, y);
}
normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
(attrs ? &unfiltered_attrset : nullptr),
true, /* cache unfiltered attrs */
cache_info,
- refresh_version);
+ refresh_version, y);
if (r < 0) {
if (r == -ENOENT) { // only update ENOENT, we'd rather retry other errors
info.status = r;
}
int RGWSI_SysObj_Cache::get_attr(const rgw_raw_obj& obj,
- const char *attr_name,
- bufferlist *dest)
+ const char *attr_name,
+ bufferlist *dest,
+ optional_yield y)
{
rgw_pool pool;
string oid;
return dest->length();
}
/* don't try to cache this one */
- return RGWSI_SysObj_Core::get_attr(obj, attr_name, dest);
+ return RGWSI_SysObj_Core::get_attr(obj, attr_name, dest, y);
}
int RGWSI_SysObj_Cache::set_attrs(const rgw_raw_obj& obj,
map<string, bufferlist>& attrs,
map<string, bufferlist> *rmattrs,
- RGWObjVersionTracker *objv_tracker)
+ RGWObjVersionTracker *objv_tracker,
+ optional_yield y)
{
rgw_pool pool;
string oid;
info.version = objv_tracker->write_version;
info.flags |= CACHE_FLAG_OBJV;
}
- int ret = RGWSI_SysObj_Core::set_attrs(obj, attrs, rmattrs, objv_tracker);
+ int ret = RGWSI_SysObj_Core::set_attrs(obj, attrs, rmattrs, objv_tracker, y);
string name = normal_name(pool, oid);
if (ret >= 0) {
cache.put(name, info, NULL);
- int r = distribute_cache(name, obj, info, UPDATE_OBJ);
+ int r = distribute_cache(name, obj, info, UPDATE_OBJ, y);
if (r < 0)
ldout(cct, 0) << "ERROR: failed to distribute cache for " << obj << dendl;
} else {
bool exclusive,
const bufferlist& data,
RGWObjVersionTracker *objv_tracker,
- real_time set_mtime)
+ real_time set_mtime,
+ optional_yield y)
{
rgw_pool pool;
string oid;
}
ceph::real_time result_mtime;
int ret = RGWSI_SysObj_Core::write(obj, &result_mtime, attrs,
- exclusive, data,
- objv_tracker, set_mtime);
+ exclusive, data,
+ objv_tracker, set_mtime, y);
if (pmtime) {
*pmtime = result_mtime;
}
// will need that system object in the near-term and b) it
// generates additional network traffic.
if (!exclusive) {
- int r = distribute_cache(name, obj, info, UPDATE_OBJ);
+ int r = distribute_cache(name, obj, info, UPDATE_OBJ, y);
if (r < 0)
ldout(cct, 0) << "ERROR: failed to distribute cache for " << obj << dendl;
}
int RGWSI_SysObj_Cache::write_data(const rgw_raw_obj& obj,
const bufferlist& data,
bool exclusive,
- RGWObjVersionTracker *objv_tracker)
+ RGWObjVersionTracker *objv_tracker,
+ optional_yield y)
{
rgw_pool pool;
string oid;
info.version = objv_tracker->write_version;
info.flags |= CACHE_FLAG_OBJV;
}
- int ret = RGWSI_SysObj_Core::write_data(obj, data, exclusive, objv_tracker);
+ int ret = RGWSI_SysObj_Core::write_data(obj, data, exclusive, objv_tracker, y);
string name = normal_name(pool, oid);
if (ret >= 0) {
cache.put(name, info, NULL);
- int r = distribute_cache(name, obj, info, UPDATE_OBJ);
+ int r = distribute_cache(name, obj, info, UPDATE_OBJ, y);
if (r < 0)
ldout(cct, 0) << "ERROR: failed to distribute cache for " << obj << dendl;
} else {
int RGWSI_SysObj_Cache::raw_stat(const rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime, uint64_t *pepoch,
map<string, bufferlist> *attrs, bufferlist *first_chunk,
- RGWObjVersionTracker *objv_tracker)
+ RGWObjVersionTracker *objv_tracker,
+ optional_yield y)
{
rgw_pool pool;
string oid;
objv_tracker->read_version = info.version;
goto done;
}
- r = RGWSI_SysObj_Core::raw_stat(obj, &size, &mtime, &epoch, &info.xattrs, first_chunk, objv_tracker);
+ r = RGWSI_SysObj_Core::raw_stat(obj, &size, &mtime, &epoch, &info.xattrs,
+ first_chunk, objv_tracker, y);
if (r < 0) {
if (r == -ENOENT) {
info.status = r;
return 0;
}
-int RGWSI_SysObj_Cache::distribute_cache(const string& normal_name, const rgw_raw_obj& obj, ObjectCacheInfo& obj_info, int op)
+int RGWSI_SysObj_Cache::distribute_cache(const string& normal_name,
+ const rgw_raw_obj& obj,
+ ObjectCacheInfo& obj_info, int op,
+ optional_yield y)
{
RGWCacheNotifyInfo info;
-
info.op = op;
-
info.obj_info = obj_info;
info.obj = obj;
bufferlist bl;
encode(info, bl);
- return notify_svc->distribute(normal_name, bl);
+ return notify_svc->distribute(normal_name, bl, y);
}
int RGWSI_SysObj_Cache::watch_cb(uint64_t notify_id,
f->dump_unsigned("size", size);
}
-void RGWSI_SysObj_Cache::call_list(const std::optional<std::string>& filter, Formatter* f)
+class RGWSI_SysObj_Cache_ASocketHook : public AdminSocketHook {
+ RGWSI_SysObj_Cache *svc;
+
+ static constexpr const char* admin_commands[4][3] = {
+ { "cache list",
+ "cache list name=filter,type=CephString,req=false",
+ "cache list [filter_str]: list object cache, possibly matching substrings" },
+ { "cache inspect name=target,type=CephString,req=true",
+ "cache inspect target: print cache element" },
+ { "cache erase name=target,type=CephString,req=true",
+ "cache erase target: erase element from cache" },
+ { "cache zap",
+ "cache zap: erase all elements from cache" }
+ };
+
+public:
+ RGWSI_SysObj_Cache_ASocketHook(RGWSI_SysObj_Cache *_svc) : svc(_svc) {}
+
+ int start();
+ void shutdown();
+
+ int call(std::string_view command, const cmdmap_t& cmdmap,
+ Formatter *f,
+ std::ostream& ss,
+ bufferlist& out) override;
+};
+
+int RGWSI_SysObj_Cache_ASocketHook::start()
+{
+ auto admin_socket = svc->ctx()->get_admin_socket();
+ for (auto cmd : admin_commands) {
+ int r = admin_socket->register_command(cmd[0], this, cmd[1]);
+ if (r < 0) {
+ ldout(svc->ctx(), 0) << "ERROR: fail to register admin socket command (r=" << r
+ << ")" << dendl;
+ return r;
+ }
+ }
+ return 0;
+}
+
+void RGWSI_SysObj_Cache_ASocketHook::shutdown()
+{
+ auto admin_socket = svc->ctx()->get_admin_socket();
+ admin_socket->unregister_commands(this);
+}
+
+int RGWSI_SysObj_Cache_ASocketHook::call(
+ std::string_view command, const cmdmap_t& cmdmap,
+ Formatter *f,
+ std::ostream& ss,
+ bufferlist& out)
+{
+ if (command == "cache list"sv) {
+ std::optional<std::string> filter;
+ if (auto i = cmdmap.find("filter"); i != cmdmap.cend()) {
+ filter = boost::get<std::string>(i->second);
+ }
+ f->open_array_section("cache_entries");
+ svc->asocket.call_list(filter, f);
+ f->close_section();
+ return 0;
+ } else if (command == "cache inspect"sv) {
+ const auto& target = boost::get<std::string>(cmdmap.at("target"));
+ if (svc->asocket.call_inspect(target, f)) {
+ return 0;
+ } else {
+ ss << "Unable to find entry "s + target + ".\n";
+ return -ENOENT;
+ }
+ } else if (command == "cache erase"sv) {
+ const auto& target = boost::get<std::string>(cmdmap.at("target"));
+ if (svc->asocket.call_erase(target)) {
+ return 0;
+ } else {
+ ss << "Unable to find entry "s + target + ".\n";
+ return -ENOENT;
+ }
+ } else if (command == "cache zap"sv) {
+ svc->asocket.call_zap();
+ return 0;
+ }
+ return -ENOSYS;
+}
+
+RGWSI_SysObj_Cache::ASocketHandler::ASocketHandler(RGWSI_SysObj_Cache *_svc) : svc(_svc)
+{
+ hook.reset(new RGWSI_SysObj_Cache_ASocketHook(_svc));
+}
+
+RGWSI_SysObj_Cache::ASocketHandler::~ASocketHandler()
+{
+}
+
+int RGWSI_SysObj_Cache::ASocketHandler::start()
+{
+ return hook->start();
+}
+
+void RGWSI_SysObj_Cache::ASocketHandler::shutdown()
+{
+ return hook->shutdown();
+}
+
+void RGWSI_SysObj_Cache::ASocketHandler::call_list(const std::optional<std::string>& filter, Formatter* f)
{
- cache.for_each(
- [this, &filter, f] (const string& name, const ObjectCacheEntry& entry) {
+ svc->cache.for_each(
+ [&filter, f] (const string& name, const ObjectCacheEntry& entry) {
if (!filter || name.find(*filter) != name.npos) {
cache_list_dump_helper(f, name, entry.info.meta.mtime,
entry.info.meta.size);
});
}
-int RGWSI_SysObj_Cache::call_inspect(const std::string& target, Formatter* f)
+int RGWSI_SysObj_Cache::ASocketHandler::call_inspect(const std::string& target, Formatter* f)
{
- if (const auto entry = cache.get(target)) {
+ if (const auto entry = svc->cache.get(target)) {
f->open_object_section("cache_entry");
f->dump_string("name", target.c_str());
entry->dump(f);
}
}
-int RGWSI_SysObj_Cache::call_erase(const std::string& target)
+int RGWSI_SysObj_Cache::ASocketHandler::call_erase(const std::string& target)
{
- return cache.remove(target);
+ return svc->cache.remove(target);
}
-int RGWSI_SysObj_Cache::call_zap()
+int RGWSI_SysObj_Cache::ASocketHandler::call_zap()
{
- cache.invalidate_all();
+ svc->cache.invalidate_all();
return 0;
}