2 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3 // vim: ts=8 sw=2 smarttab ft=cpp
5 #include "common/admin_socket.h"
7 #include "svc_sys_obj_cache.h"
9 #include "svc_notify.h"
11 #include "rgw/rgw_zone.h"
12 #include "rgw/rgw_tools.h"
14 #define dout_subsys ceph_subsys_rgw
16 class RGWSI_SysObj_Cache_CB
: public RGWSI_Notify::CB
18 RGWSI_SysObj_Cache
*svc
;
20 RGWSI_SysObj_Cache_CB(RGWSI_SysObj_Cache
*_svc
) : svc(_svc
) {}
21 int watch_cb(uint64_t notify_id
,
25 return svc
->watch_cb(notify_id
, cookie
, notifier_id
, bl
);
28 void set_enabled(bool status
) {
29 svc
->set_enabled(status
);
33 int RGWSI_SysObj_Cache::do_start()
35 int r
= asocket
.start();
40 r
= RGWSI_SysObj_Core::do_start();
45 r
= notify_svc
->start();
50 assert(notify_svc
->is_started());
52 cb
.reset(new RGWSI_SysObj_Cache_CB(this));
54 notify_svc
->register_watch_cb(cb
.get());
59 void RGWSI_SysObj_Cache::shutdown()
62 RGWSI_SysObj_Core::shutdown();
65 static string
normal_name(rgw_pool
& pool
, const std::string
& oid
) {
67 buf
.reserve(pool
.name
.size() + pool
.ns
.size() + oid
.size() + 2);
68 buf
.append(pool
.name
).append("+").append(pool
.ns
).append("+").append(oid
);
72 void RGWSI_SysObj_Cache::normalize_pool_and_obj(const rgw_pool
& src_pool
, const string
& src_obj
, rgw_pool
& dst_pool
, string
& dst_obj
)
78 dst_pool
= zone_svc
->get_zone_params().domain_root
;
79 dst_obj
= src_pool
.name
;
84 int RGWSI_SysObj_Cache::remove(RGWSysObjectCtxBase
& obj_ctx
,
85 RGWObjVersionTracker
*objv_tracker
,
86 const rgw_raw_obj
& obj
,
92 normalize_pool_and_obj(obj
.pool
, obj
.oid
, pool
, oid
);
94 string name
= normal_name(pool
, oid
);
98 int r
= distribute_cache(name
, obj
, info
, REMOVE_OBJ
, y
);
100 ldout(cct
, 0) << "ERROR: " << __func__
<< "(): failed to distribute cache: r=" << r
<< dendl
;
103 return RGWSI_SysObj_Core::remove(obj_ctx
, objv_tracker
, obj
, y
);
106 int RGWSI_SysObj_Cache::read(RGWSysObjectCtxBase
& obj_ctx
,
107 RGWSI_SysObj_Obj_GetObjState
& read_state
,
108 RGWObjVersionTracker
*objv_tracker
,
109 const rgw_raw_obj
& obj
,
110 bufferlist
*obl
, off_t ofs
, off_t end
,
111 map
<string
, bufferlist
> *attrs
,
113 rgw_cache_entry_info
*cache_info
,
114 boost::optional
<obj_version
> refresh_version
,
120 return RGWSI_SysObj_Core::read(obj_ctx
, read_state
, objv_tracker
,
121 obj
, obl
, ofs
, end
, attrs
, raw_attrs
,
122 cache_info
, refresh_version
, y
);
125 normalize_pool_and_obj(obj
.pool
, obj
.oid
, pool
, oid
);
126 string name
= normal_name(pool
, oid
);
128 ObjectCacheInfo info
;
130 uint32_t flags
= (end
!= 0 ? CACHE_FLAG_DATA
: 0);
132 flags
|= CACHE_FLAG_OBJV
;
134 flags
|= CACHE_FLAG_XATTRS
;
136 int r
= cache
.get(name
, info
, flags
, cache_info
);
138 (!refresh_version
|| !info
.version
.compare(&(*refresh_version
)))) {
142 bufferlist
& bl
= info
.data
;
144 bufferlist::iterator i
= bl
.begin();
150 objv_tracker
->read_version
= info
.version
;
153 *attrs
= info
.xattrs
;
155 rgw_filter_attrset(info
.xattrs
, RGW_ATTR_PREFIX
, attrs
);
158 return obl
->length();
163 map
<string
, bufferlist
> unfiltered_attrset
;
164 r
= RGWSI_SysObj_Core::read(obj_ctx
, read_state
, objv_tracker
,
166 (attrs
? &unfiltered_attrset
: nullptr),
167 true, /* cache unfiltered attrs */
171 if (r
== -ENOENT
) { // only update ENOENT, we'd rather retry other errors
173 cache
.put(name
, info
, cache_info
);
178 if (obl
->length() == end
+ 1) {
179 /* in this case, most likely object contains more data, we can't cache it */
180 flags
&= ~CACHE_FLAG_DATA
;
183 bufferlist
& bl
= info
.data
;
185 bufferlist::iterator o
= obl
->begin();
192 info
.version
= objv_tracker
->read_version
;
195 info
.xattrs
= std::move(unfiltered_attrset
);
197 *attrs
= info
.xattrs
;
199 rgw_filter_attrset(info
.xattrs
, RGW_ATTR_PREFIX
, attrs
);
202 cache
.put(name
, info
, cache_info
);
206 int RGWSI_SysObj_Cache::get_attr(const rgw_raw_obj
& obj
,
207 const char *attr_name
,
214 normalize_pool_and_obj(obj
.pool
, obj
.oid
, pool
, oid
);
215 string name
= normal_name(pool
, oid
);
217 ObjectCacheInfo info
;
219 uint32_t flags
= CACHE_FLAG_XATTRS
;
221 int r
= cache
.get(name
, info
, flags
, nullptr);
226 auto iter
= info
.xattrs
.find(attr_name
);
227 if (iter
== info
.xattrs
.end()) {
231 *dest
= iter
->second
;
232 return dest
->length();
233 } else if (r
== -ENODATA
) {
236 /* don't try to cache this one */
237 return RGWSI_SysObj_Core::get_attr(obj
, attr_name
, dest
, y
);
240 int RGWSI_SysObj_Cache::set_attrs(const rgw_raw_obj
& obj
,
241 map
<string
, bufferlist
>& attrs
,
242 map
<string
, bufferlist
> *rmattrs
,
243 RGWObjVersionTracker
*objv_tracker
,
248 normalize_pool_and_obj(obj
.pool
, obj
.oid
, pool
, oid
);
249 ObjectCacheInfo info
;
252 info
.rm_xattrs
= *rmattrs
;
255 info
.flags
= CACHE_FLAG_MODIFY_XATTRS
;
256 int ret
= RGWSI_SysObj_Core::set_attrs(obj
, attrs
, rmattrs
, objv_tracker
, y
);
257 string name
= normal_name(pool
, oid
);
259 if (objv_tracker
&& objv_tracker
->read_version
.ver
) {
260 info
.version
= objv_tracker
->read_version
;
261 info
.flags
|= CACHE_FLAG_OBJV
;
263 cache
.put(name
, info
, NULL
);
264 int r
= distribute_cache(name
, obj
, info
, UPDATE_OBJ
, y
);
266 ldout(cct
, 0) << "ERROR: failed to distribute cache for " << obj
<< dendl
;
274 int RGWSI_SysObj_Cache::write(const rgw_raw_obj
& obj
,
276 map
<std::string
, bufferlist
>& attrs
,
278 const bufferlist
& data
,
279 RGWObjVersionTracker
*objv_tracker
,
285 normalize_pool_and_obj(obj
.pool
, obj
.oid
, pool
, oid
);
286 ObjectCacheInfo info
;
290 info
.flags
= CACHE_FLAG_XATTRS
| CACHE_FLAG_DATA
| CACHE_FLAG_META
;
291 ceph::real_time result_mtime
;
292 int ret
= RGWSI_SysObj_Core::write(obj
, &result_mtime
, attrs
,
294 objv_tracker
, set_mtime
, y
);
296 *pmtime
= result_mtime
;
298 if (objv_tracker
&& objv_tracker
->read_version
.ver
) {
299 info
.version
= objv_tracker
->read_version
;
300 info
.flags
|= CACHE_FLAG_OBJV
;
302 info
.meta
.mtime
= result_mtime
;
303 info
.meta
.size
= data
.length();
304 string name
= normal_name(pool
, oid
);
306 cache
.put(name
, info
, NULL
);
307 int r
= distribute_cache(name
, obj
, info
, UPDATE_OBJ
, y
);
309 ldout(cct
, 0) << "ERROR: failed to distribute cache for " << obj
<< dendl
;
317 int RGWSI_SysObj_Cache::write_data(const rgw_raw_obj
& obj
,
318 const bufferlist
& data
,
320 RGWObjVersionTracker
*objv_tracker
,
325 normalize_pool_and_obj(obj
.pool
, obj
.oid
, pool
, oid
);
327 ObjectCacheInfo info
;
329 info
.meta
.size
= data
.length();
331 info
.flags
= CACHE_FLAG_DATA
;
333 int ret
= RGWSI_SysObj_Core::write_data(obj
, data
, exclusive
, objv_tracker
, y
);
334 string name
= normal_name(pool
, oid
);
336 if (objv_tracker
&& objv_tracker
->read_version
.ver
) {
337 info
.version
= objv_tracker
->read_version
;
338 info
.flags
|= CACHE_FLAG_OBJV
;
340 cache
.put(name
, info
, NULL
);
341 int r
= distribute_cache(name
, obj
, info
, UPDATE_OBJ
, y
);
343 ldout(cct
, 0) << "ERROR: failed to distribute cache for " << obj
<< dendl
;
351 int RGWSI_SysObj_Cache::raw_stat(const rgw_raw_obj
& obj
, uint64_t *psize
, real_time
*pmtime
, uint64_t *pepoch
,
352 map
<string
, bufferlist
> *attrs
, bufferlist
*first_chunk
,
353 RGWObjVersionTracker
*objv_tracker
,
358 normalize_pool_and_obj(obj
.pool
, obj
.oid
, pool
, oid
);
360 string name
= normal_name(pool
, oid
);
366 ObjectCacheInfo info
;
367 uint32_t flags
= CACHE_FLAG_META
| CACHE_FLAG_XATTRS
;
369 flags
|= CACHE_FLAG_OBJV
;
370 int r
= cache
.get(name
, info
, flags
, NULL
);
375 size
= info
.meta
.size
;
376 mtime
= info
.meta
.mtime
;
379 objv_tracker
->read_version
= info
.version
;
385 r
= RGWSI_SysObj_Core::raw_stat(obj
, &size
, &mtime
, &epoch
, &info
.xattrs
,
386 first_chunk
, objv_tracker
, y
);
390 cache
.put(name
, info
, NULL
);
396 info
.meta
.mtime
= mtime
;
397 info
.meta
.size
= size
;
398 info
.flags
= CACHE_FLAG_META
| CACHE_FLAG_XATTRS
;
400 info
.flags
|= CACHE_FLAG_OBJV
;
401 info
.version
= objv_tracker
->read_version
;
403 cache
.put(name
, info
, NULL
);
412 *attrs
= info
.xattrs
;
416 int RGWSI_SysObj_Cache::distribute_cache(const string
& normal_name
,
417 const rgw_raw_obj
& obj
,
418 ObjectCacheInfo
& obj_info
, int op
,
421 RGWCacheNotifyInfo info
;
423 info
.obj_info
= obj_info
;
425 return notify_svc
->distribute(normal_name
, info
, y
);
428 int RGWSI_SysObj_Cache::watch_cb(uint64_t notify_id
,
430 uint64_t notifier_id
,
433 RGWCacheNotifyInfo info
;
436 auto iter
= bl
.cbegin();
438 } catch (buffer::end_of_buffer
& err
) {
439 ldout(cct
, 0) << "ERROR: got bad notification" << dendl
;
441 } catch (buffer::error
& err
) {
442 ldout(cct
, 0) << "ERROR: buffer::error" << dendl
;
448 normalize_pool_and_obj(info
.obj
.pool
, info
.obj
.oid
, pool
, oid
);
449 string name
= normal_name(pool
, oid
);
453 cache
.put(name
, info
.obj_info
, NULL
);
459 ldout(cct
, 0) << "WARNING: got unknown notification op: " << info
.op
<< dendl
;
466 void RGWSI_SysObj_Cache::set_enabled(bool status
)
468 cache
.set_enabled(status
);
471 bool RGWSI_SysObj_Cache::chain_cache_entry(std::initializer_list
<rgw_cache_entry_info
*> cache_info_entries
,
472 RGWChainedCache::Entry
*chained_entry
)
474 return cache
.chain_cache_entry(cache_info_entries
, chained_entry
);
477 void RGWSI_SysObj_Cache::register_chained_cache(RGWChainedCache
*cc
)
479 cache
.chain_cache(cc
);
482 void RGWSI_SysObj_Cache::unregister_chained_cache(RGWChainedCache
*cc
)
484 cache
.unchain_cache(cc
);
487 static void cache_list_dump_helper(Formatter
* f
,
488 const std::string
& name
,
489 const ceph::real_time mtime
,
490 const std::uint64_t size
)
492 f
->dump_string("name", name
);
493 f
->dump_string("mtime", ceph::to_iso_8601(mtime
));
494 f
->dump_unsigned("size", size
);
497 class RGWSI_SysObj_Cache_ASocketHook
: public AdminSocketHook
{
498 RGWSI_SysObj_Cache
*svc
;
500 static constexpr const char* admin_commands
[4][3] = {
502 "cache list name=filter,type=CephString,req=false",
503 "cache list [filter_str]: list object cache, possibly matching substrings" },
504 { "cache inspect name=target,type=CephString,req=true",
505 "cache inspect target: print cache element" },
506 { "cache erase name=target,type=CephString,req=true",
507 "cache erase target: erase element from cache" },
509 "cache zap: erase all elements from cache" }
513 RGWSI_SysObj_Cache_ASocketHook(RGWSI_SysObj_Cache
*_svc
) : svc(_svc
) {}
518 int call(std::string_view command
, const cmdmap_t
& cmdmap
,
521 bufferlist
& out
) override
;
524 int RGWSI_SysObj_Cache_ASocketHook::start()
526 auto admin_socket
= svc
->ctx()->get_admin_socket();
527 for (auto cmd
: admin_commands
) {
528 int r
= admin_socket
->register_command(cmd
[0], this, cmd
[1]);
530 ldout(svc
->ctx(), 0) << "ERROR: fail to register admin socket command (r=" << r
538 void RGWSI_SysObj_Cache_ASocketHook::shutdown()
540 auto admin_socket
= svc
->ctx()->get_admin_socket();
541 admin_socket
->unregister_commands(this);
544 int RGWSI_SysObj_Cache_ASocketHook::call(
545 std::string_view command
, const cmdmap_t
& cmdmap
,
550 if (command
== "cache list"sv
) {
551 std::optional
<std::string
> filter
;
552 if (auto i
= cmdmap
.find("filter"); i
!= cmdmap
.cend()) {
553 filter
= boost::get
<std::string
>(i
->second
);
555 f
->open_array_section("cache_entries");
556 svc
->asocket
.call_list(filter
, f
);
559 } else if (command
== "cache inspect"sv
) {
560 const auto& target
= boost::get
<std::string
>(cmdmap
.at("target"));
561 if (svc
->asocket
.call_inspect(target
, f
)) {
564 ss
<< "Unable to find entry "s
+ target
+ ".\n";
567 } else if (command
== "cache erase"sv
) {
568 const auto& target
= boost::get
<std::string
>(cmdmap
.at("target"));
569 if (svc
->asocket
.call_erase(target
)) {
572 ss
<< "Unable to find entry "s
+ target
+ ".\n";
575 } else if (command
== "cache zap"sv
) {
576 svc
->asocket
.call_zap();
582 RGWSI_SysObj_Cache::ASocketHandler::ASocketHandler(RGWSI_SysObj_Cache
*_svc
) : svc(_svc
)
584 hook
.reset(new RGWSI_SysObj_Cache_ASocketHook(_svc
));
587 RGWSI_SysObj_Cache::ASocketHandler::~ASocketHandler()
591 int RGWSI_SysObj_Cache::ASocketHandler::start()
593 return hook
->start();
596 void RGWSI_SysObj_Cache::ASocketHandler::shutdown()
598 return hook
->shutdown();
601 void RGWSI_SysObj_Cache::ASocketHandler::call_list(const std::optional
<std::string
>& filter
, Formatter
* f
)
604 [&filter
, f
] (const string
& name
, const ObjectCacheEntry
& entry
) {
605 if (!filter
|| name
.find(*filter
) != name
.npos
) {
606 cache_list_dump_helper(f
, name
, entry
.info
.meta
.mtime
,
607 entry
.info
.meta
.size
);
612 int RGWSI_SysObj_Cache::ASocketHandler::call_inspect(const std::string
& target
, Formatter
* f
)
614 if (const auto entry
= svc
->cache
.get(target
)) {
615 f
->open_object_section("cache_entry");
616 f
->dump_string("name", target
.c_str());
625 int RGWSI_SysObj_Cache::ASocketHandler::call_erase(const std::string
& target
)
627 return svc
->cache
.remove(target
);
630 int RGWSI_SysObj_Cache::ASocketHandler::call_zap()
632 svc
->cache
.invalidate_all();