2 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3 // vim: ts=8 sw=2 smarttab ft=cpp
5 #include "common/admin_socket.h"
7 #include "svc_sys_obj_cache.h"
9 #include "svc_notify.h"
11 #include "rgw/rgw_zone.h"
12 #include "rgw/rgw_tools.h"
14 #define dout_subsys ceph_subsys_rgw
16 class RGWSI_SysObj_Cache_CB
: public RGWSI_Notify::CB
18 RGWSI_SysObj_Cache
*svc
;
20 RGWSI_SysObj_Cache_CB(RGWSI_SysObj_Cache
*_svc
) : svc(_svc
) {}
21 int watch_cb(uint64_t notify_id
,
25 return svc
->watch_cb(notify_id
, cookie
, notifier_id
, bl
);
28 void set_enabled(bool status
) {
29 svc
->set_enabled(status
);
33 int RGWSI_SysObj_Cache::do_start(optional_yield y
)
35 int r
= asocket
.start();
40 r
= RGWSI_SysObj_Core::do_start(y
);
45 r
= notify_svc
->start(y
);
50 assert(notify_svc
->is_started());
52 cb
.reset(new RGWSI_SysObj_Cache_CB(this));
54 notify_svc
->register_watch_cb(cb
.get());
59 void RGWSI_SysObj_Cache::shutdown()
62 RGWSI_SysObj_Core::shutdown();
65 static string
normal_name(rgw_pool
& pool
, const std::string
& oid
) {
67 buf
.reserve(pool
.name
.size() + pool
.ns
.size() + oid
.size() + 2);
68 buf
.append(pool
.name
).append("+").append(pool
.ns
).append("+").append(oid
);
72 void RGWSI_SysObj_Cache::normalize_pool_and_obj(const rgw_pool
& src_pool
, const string
& src_obj
, rgw_pool
& dst_pool
, string
& dst_obj
)
78 dst_pool
= zone_svc
->get_zone_params().domain_root
;
79 dst_obj
= src_pool
.name
;
84 int RGWSI_SysObj_Cache::remove(RGWSysObjectCtxBase
& obj_ctx
,
85 RGWObjVersionTracker
*objv_tracker
,
86 const rgw_raw_obj
& obj
,
92 normalize_pool_and_obj(obj
.pool
, obj
.oid
, pool
, oid
);
94 string name
= normal_name(pool
, oid
);
98 int r
= distribute_cache(name
, obj
, info
, REMOVE_OBJ
, y
);
100 ldout(cct
, 0) << "ERROR: " << __func__
<< "(): failed to distribute cache: r=" << r
<< dendl
;
103 return RGWSI_SysObj_Core::remove(obj_ctx
, objv_tracker
, obj
, y
);
106 int RGWSI_SysObj_Cache::read(RGWSysObjectCtxBase
& obj_ctx
,
107 RGWSI_SysObj_Obj_GetObjState
& read_state
,
108 RGWObjVersionTracker
*objv_tracker
,
109 const rgw_raw_obj
& obj
,
110 bufferlist
*obl
, off_t ofs
, off_t end
,
111 map
<string
, bufferlist
> *attrs
,
113 rgw_cache_entry_info
*cache_info
,
114 boost::optional
<obj_version
> refresh_version
,
120 return RGWSI_SysObj_Core::read(obj_ctx
, read_state
, objv_tracker
,
121 obj
, obl
, ofs
, end
, attrs
, raw_attrs
,
122 cache_info
, refresh_version
, y
);
125 normalize_pool_and_obj(obj
.pool
, obj
.oid
, pool
, oid
);
126 string name
= normal_name(pool
, oid
);
128 ObjectCacheInfo info
;
130 uint32_t flags
= (end
!= 0 ? CACHE_FLAG_DATA
: 0);
132 flags
|= CACHE_FLAG_OBJV
;
134 flags
|= CACHE_FLAG_XATTRS
;
136 int r
= cache
.get(name
, info
, flags
, cache_info
);
138 (!refresh_version
|| !info
.version
.compare(&(*refresh_version
)))) {
142 bufferlist
& bl
= info
.data
;
144 bufferlist::iterator i
= bl
.begin();
150 objv_tracker
->read_version
= info
.version
;
153 *attrs
= info
.xattrs
;
155 rgw_filter_attrset(info
.xattrs
, RGW_ATTR_PREFIX
, attrs
);
158 return obl
->length();
163 map
<string
, bufferlist
> unfiltered_attrset
;
164 r
= RGWSI_SysObj_Core::read(obj_ctx
, read_state
, objv_tracker
,
166 (attrs
? &unfiltered_attrset
: nullptr),
167 true, /* cache unfiltered attrs */
171 if (r
== -ENOENT
) { // only update ENOENT, we'd rather retry other errors
173 cache
.put(name
, info
, cache_info
);
178 if (obl
->length() == end
+ 1) {
179 /* in this case, most likely object contains more data, we can't cache it */
180 flags
&= ~CACHE_FLAG_DATA
;
183 bufferlist
& bl
= info
.data
;
185 bufferlist::iterator o
= obl
->begin();
192 info
.version
= objv_tracker
->read_version
;
195 info
.xattrs
= std::move(unfiltered_attrset
);
197 *attrs
= info
.xattrs
;
199 rgw_filter_attrset(info
.xattrs
, RGW_ATTR_PREFIX
, attrs
);
202 cache
.put(name
, info
, cache_info
);
206 int RGWSI_SysObj_Cache::get_attr(const rgw_raw_obj
& obj
,
207 const char *attr_name
,
214 normalize_pool_and_obj(obj
.pool
, obj
.oid
, pool
, oid
);
215 string name
= normal_name(pool
, oid
);
217 ObjectCacheInfo info
;
219 uint32_t flags
= CACHE_FLAG_XATTRS
;
221 int r
= cache
.get(name
, info
, flags
, nullptr);
226 auto iter
= info
.xattrs
.find(attr_name
);
227 if (iter
== info
.xattrs
.end()) {
231 *dest
= iter
->second
;
232 return dest
->length();
233 } else if (r
== -ENODATA
) {
236 /* don't try to cache this one */
237 return RGWSI_SysObj_Core::get_attr(obj
, attr_name
, dest
, y
);
240 int RGWSI_SysObj_Cache::set_attrs(const rgw_raw_obj
& obj
,
241 map
<string
, bufferlist
>& attrs
,
242 map
<string
, bufferlist
> *rmattrs
,
243 RGWObjVersionTracker
*objv_tracker
,
248 normalize_pool_and_obj(obj
.pool
, obj
.oid
, pool
, oid
);
249 ObjectCacheInfo info
;
252 info
.rm_xattrs
= *rmattrs
;
255 info
.flags
= CACHE_FLAG_MODIFY_XATTRS
;
256 int ret
= RGWSI_SysObj_Core::set_attrs(obj
, attrs
, rmattrs
, objv_tracker
, y
);
257 string name
= normal_name(pool
, oid
);
259 if (objv_tracker
&& objv_tracker
->read_version
.ver
) {
260 info
.version
= objv_tracker
->read_version
;
261 info
.flags
|= CACHE_FLAG_OBJV
;
263 cache
.put(name
, info
, NULL
);
264 int r
= distribute_cache(name
, obj
, info
, UPDATE_OBJ
, y
);
266 ldout(cct
, 0) << "ERROR: failed to distribute cache for " << obj
<< dendl
;
274 int RGWSI_SysObj_Cache::write(const rgw_raw_obj
& obj
,
276 map
<std::string
, bufferlist
>& attrs
,
278 const bufferlist
& data
,
279 RGWObjVersionTracker
*objv_tracker
,
285 normalize_pool_and_obj(obj
.pool
, obj
.oid
, pool
, oid
);
286 ObjectCacheInfo info
;
290 info
.flags
= CACHE_FLAG_XATTRS
| CACHE_FLAG_DATA
| CACHE_FLAG_META
;
291 ceph::real_time result_mtime
;
292 int ret
= RGWSI_SysObj_Core::write(obj
, &result_mtime
, attrs
,
294 objv_tracker
, set_mtime
, y
);
296 *pmtime
= result_mtime
;
298 if (objv_tracker
&& objv_tracker
->read_version
.ver
) {
299 info
.version
= objv_tracker
->read_version
;
300 info
.flags
|= CACHE_FLAG_OBJV
;
302 info
.meta
.mtime
= result_mtime
;
303 info
.meta
.size
= data
.length();
304 string name
= normal_name(pool
, oid
);
306 cache
.put(name
, info
, NULL
);
307 int r
= distribute_cache(name
, obj
, info
, UPDATE_OBJ
, y
);
309 ldout(cct
, 0) << "ERROR: failed to distribute cache for " << obj
<< dendl
;
317 int RGWSI_SysObj_Cache::write_data(const rgw_raw_obj
& obj
,
318 const bufferlist
& data
,
320 RGWObjVersionTracker
*objv_tracker
,
325 normalize_pool_and_obj(obj
.pool
, obj
.oid
, pool
, oid
);
327 ObjectCacheInfo info
;
329 info
.meta
.size
= data
.length();
331 info
.flags
= CACHE_FLAG_DATA
;
333 int ret
= RGWSI_SysObj_Core::write_data(obj
, data
, exclusive
, objv_tracker
, y
);
334 string name
= normal_name(pool
, oid
);
336 if (objv_tracker
&& objv_tracker
->read_version
.ver
) {
337 info
.version
= objv_tracker
->read_version
;
338 info
.flags
|= CACHE_FLAG_OBJV
;
340 cache
.put(name
, info
, NULL
);
341 int r
= distribute_cache(name
, obj
, info
, UPDATE_OBJ
, y
);
343 ldout(cct
, 0) << "ERROR: failed to distribute cache for " << obj
<< dendl
;
351 int RGWSI_SysObj_Cache::raw_stat(const rgw_raw_obj
& obj
, uint64_t *psize
, real_time
*pmtime
, uint64_t *pepoch
,
352 map
<string
, bufferlist
> *attrs
, bufferlist
*first_chunk
,
353 RGWObjVersionTracker
*objv_tracker
,
358 normalize_pool_and_obj(obj
.pool
, obj
.oid
, pool
, oid
);
360 string name
= normal_name(pool
, oid
);
366 ObjectCacheInfo info
;
367 uint32_t flags
= CACHE_FLAG_META
| CACHE_FLAG_XATTRS
;
369 flags
|= CACHE_FLAG_OBJV
;
370 int r
= cache
.get(name
, info
, flags
, NULL
);
375 size
= info
.meta
.size
;
376 mtime
= info
.meta
.mtime
;
379 objv_tracker
->read_version
= info
.version
;
385 r
= RGWSI_SysObj_Core::raw_stat(obj
, &size
, &mtime
, &epoch
, &info
.xattrs
,
386 first_chunk
, objv_tracker
, y
);
390 cache
.put(name
, info
, NULL
);
396 info
.meta
.mtime
= mtime
;
397 info
.meta
.size
= size
;
398 info
.flags
= CACHE_FLAG_META
| CACHE_FLAG_XATTRS
;
400 info
.flags
|= CACHE_FLAG_OBJV
;
401 info
.version
= objv_tracker
->read_version
;
403 cache
.put(name
, info
, NULL
);
412 *attrs
= info
.xattrs
;
416 int RGWSI_SysObj_Cache::distribute_cache(const string
& normal_name
,
417 const rgw_raw_obj
& obj
,
418 ObjectCacheInfo
& obj_info
, int op
,
421 RGWCacheNotifyInfo info
;
423 info
.obj_info
= obj_info
;
427 return notify_svc
->distribute(normal_name
, bl
, y
);
430 int RGWSI_SysObj_Cache::watch_cb(uint64_t notify_id
,
432 uint64_t notifier_id
,
435 RGWCacheNotifyInfo info
;
438 auto iter
= bl
.cbegin();
440 } catch (buffer::end_of_buffer
& err
) {
441 ldout(cct
, 0) << "ERROR: got bad notification" << dendl
;
443 } catch (buffer::error
& err
) {
444 ldout(cct
, 0) << "ERROR: buffer::error" << dendl
;
450 normalize_pool_and_obj(info
.obj
.pool
, info
.obj
.oid
, pool
, oid
);
451 string name
= normal_name(pool
, oid
);
455 cache
.put(name
, info
.obj_info
, NULL
);
461 ldout(cct
, 0) << "WARNING: got unknown notification op: " << info
.op
<< dendl
;
468 void RGWSI_SysObj_Cache::set_enabled(bool status
)
470 cache
.set_enabled(status
);
473 bool RGWSI_SysObj_Cache::chain_cache_entry(std::initializer_list
<rgw_cache_entry_info
*> cache_info_entries
,
474 RGWChainedCache::Entry
*chained_entry
)
476 return cache
.chain_cache_entry(cache_info_entries
, chained_entry
);
479 void RGWSI_SysObj_Cache::register_chained_cache(RGWChainedCache
*cc
)
481 cache
.chain_cache(cc
);
484 void RGWSI_SysObj_Cache::unregister_chained_cache(RGWChainedCache
*cc
)
486 cache
.unchain_cache(cc
);
489 static void cache_list_dump_helper(Formatter
* f
,
490 const std::string
& name
,
491 const ceph::real_time mtime
,
492 const std::uint64_t size
)
494 f
->dump_string("name", name
);
495 f
->dump_string("mtime", ceph::to_iso_8601(mtime
));
496 f
->dump_unsigned("size", size
);
499 class RGWSI_SysObj_Cache_ASocketHook
: public AdminSocketHook
{
500 RGWSI_SysObj_Cache
*svc
;
502 static constexpr std::string_view admin_commands
[][2] = {
503 { "cache list name=filter,type=CephString,req=false",
504 "cache list [filter_str]: list object cache, possibly matching substrings" },
505 { "cache inspect name=target,type=CephString,req=true",
506 "cache inspect target: print cache element" },
507 { "cache erase name=target,type=CephString,req=true",
508 "cache erase target: erase element from cache" },
510 "cache zap: erase all elements from cache" }
514 RGWSI_SysObj_Cache_ASocketHook(RGWSI_SysObj_Cache
*_svc
) : svc(_svc
) {}
519 int call(std::string_view command
, const cmdmap_t
& cmdmap
,
522 bufferlist
& out
) override
;
525 int RGWSI_SysObj_Cache_ASocketHook::start()
527 auto admin_socket
= svc
->ctx()->get_admin_socket();
528 for (auto cmd
: admin_commands
) {
529 int r
= admin_socket
->register_command(cmd
[0], this, cmd
[1]);
531 ldout(svc
->ctx(), 0) << "ERROR: fail to register admin socket command (r=" << r
539 void RGWSI_SysObj_Cache_ASocketHook::shutdown()
541 auto admin_socket
= svc
->ctx()->get_admin_socket();
542 admin_socket
->unregister_commands(this);
545 int RGWSI_SysObj_Cache_ASocketHook::call(
546 std::string_view command
, const cmdmap_t
& cmdmap
,
551 if (command
== "cache list"sv
) {
552 std::optional
<std::string
> filter
;
553 if (auto i
= cmdmap
.find("filter"); i
!= cmdmap
.cend()) {
554 filter
= boost::get
<std::string
>(i
->second
);
556 f
->open_array_section("cache_entries");
557 svc
->asocket
.call_list(filter
, f
);
560 } else if (command
== "cache inspect"sv
) {
561 const auto& target
= boost::get
<std::string
>(cmdmap
.at("target"));
562 if (svc
->asocket
.call_inspect(target
, f
)) {
565 ss
<< "Unable to find entry "s
+ target
+ ".\n";
568 } else if (command
== "cache erase"sv
) {
569 const auto& target
= boost::get
<std::string
>(cmdmap
.at("target"));
570 if (svc
->asocket
.call_erase(target
)) {
573 ss
<< "Unable to find entry "s
+ target
+ ".\n";
576 } else if (command
== "cache zap"sv
) {
577 svc
->asocket
.call_zap();
583 RGWSI_SysObj_Cache::ASocketHandler::ASocketHandler(RGWSI_SysObj_Cache
*_svc
) : svc(_svc
)
585 hook
.reset(new RGWSI_SysObj_Cache_ASocketHook(_svc
));
588 RGWSI_SysObj_Cache::ASocketHandler::~ASocketHandler()
592 int RGWSI_SysObj_Cache::ASocketHandler::start()
594 return hook
->start();
597 void RGWSI_SysObj_Cache::ASocketHandler::shutdown()
599 return hook
->shutdown();
602 void RGWSI_SysObj_Cache::ASocketHandler::call_list(const std::optional
<std::string
>& filter
, Formatter
* f
)
605 [&filter
, f
] (const string
& name
, const ObjectCacheEntry
& entry
) {
606 if (!filter
|| name
.find(*filter
) != name
.npos
) {
607 cache_list_dump_helper(f
, name
, entry
.info
.meta
.mtime
,
608 entry
.info
.meta
.size
);
613 int RGWSI_SysObj_Cache::ASocketHandler::call_inspect(const std::string
& target
, Formatter
* f
)
615 if (const auto entry
= svc
->cache
.get(target
)) {
616 f
->open_object_section("cache_entry");
617 f
->dump_string("name", target
.c_str());
626 int RGWSI_SysObj_Cache::ASocketHandler::call_erase(const std::string
& target
)
628 return svc
->cache
.remove(target
);
631 int RGWSI_SysObj_Cache::ASocketHandler::call_zap()
633 svc
->cache
.invalidate_all();