2 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3 // vim: ts=8 sw=2 smarttab ft=cpp
5 #include "common/admin_socket.h"
7 #include "svc_sys_obj_cache.h"
9 #include "svc_notify.h"
11 #include "rgw/rgw_zone.h"
12 #include "rgw/rgw_tools.h"
14 #define dout_subsys ceph_subsys_rgw
18 class RGWSI_SysObj_Cache_CB
: public RGWSI_Notify::CB
20 RGWSI_SysObj_Cache
*svc
;
22 RGWSI_SysObj_Cache_CB(RGWSI_SysObj_Cache
*_svc
) : svc(_svc
) {}
23 int watch_cb(const DoutPrefixProvider
*dpp
,
28 return svc
->watch_cb(dpp
, notify_id
, cookie
, notifier_id
, bl
);
31 void set_enabled(bool status
) {
32 svc
->set_enabled(status
);
36 int RGWSI_SysObj_Cache::do_start(optional_yield y
, const DoutPrefixProvider
*dpp
)
38 int r
= asocket
.start();
43 r
= RGWSI_SysObj_Core::do_start(y
, dpp
);
48 r
= notify_svc
->start(y
, dpp
);
53 assert(notify_svc
->is_started());
55 cb
.reset(new RGWSI_SysObj_Cache_CB(this));
57 notify_svc
->register_watch_cb(cb
.get());
62 void RGWSI_SysObj_Cache::shutdown()
65 RGWSI_SysObj_Core::shutdown();
68 static string
normal_name(rgw_pool
& pool
, const std::string
& oid
) {
70 buf
.reserve(pool
.name
.size() + pool
.ns
.size() + oid
.size() + 2);
71 buf
.append(pool
.name
).append("+").append(pool
.ns
).append("+").append(oid
);
75 void RGWSI_SysObj_Cache::normalize_pool_and_obj(const rgw_pool
& src_pool
, const string
& src_obj
, rgw_pool
& dst_pool
, string
& dst_obj
)
81 dst_pool
= zone_svc
->get_zone_params().domain_root
;
82 dst_obj
= src_pool
.name
;
87 int RGWSI_SysObj_Cache::remove(const DoutPrefixProvider
*dpp
,
88 RGWSysObjectCtxBase
& obj_ctx
,
89 RGWObjVersionTracker
*objv_tracker
,
90 const rgw_raw_obj
& obj
,
96 normalize_pool_and_obj(obj
.pool
, obj
.oid
, pool
, oid
);
98 string name
= normal_name(pool
, oid
);
99 cache
.invalidate_remove(dpp
, name
);
101 ObjectCacheInfo info
;
102 int r
= distribute_cache(dpp
, name
, obj
, info
, INVALIDATE_OBJ
, y
);
104 ldpp_dout(dpp
, 0) << "ERROR: " << __func__
<< "(): failed to distribute cache: r=" << r
<< dendl
;
107 return RGWSI_SysObj_Core::remove(dpp
, obj_ctx
, objv_tracker
, obj
, y
);
110 int RGWSI_SysObj_Cache::read(const DoutPrefixProvider
*dpp
,
111 RGWSysObjectCtxBase
& obj_ctx
,
112 RGWSI_SysObj_Obj_GetObjState
& read_state
,
113 RGWObjVersionTracker
*objv_tracker
,
114 const rgw_raw_obj
& obj
,
115 bufferlist
*obl
, off_t ofs
, off_t end
,
116 map
<string
, bufferlist
> *attrs
,
118 rgw_cache_entry_info
*cache_info
,
119 boost::optional
<obj_version
> refresh_version
,
125 return RGWSI_SysObj_Core::read(dpp
, obj_ctx
, read_state
, objv_tracker
,
126 obj
, obl
, ofs
, end
, attrs
, raw_attrs
,
127 cache_info
, refresh_version
, y
);
130 normalize_pool_and_obj(obj
.pool
, obj
.oid
, pool
, oid
);
131 string name
= normal_name(pool
, oid
);
133 ObjectCacheInfo info
;
135 uint32_t flags
= (end
!= 0 ? CACHE_FLAG_DATA
: 0);
137 flags
|= CACHE_FLAG_OBJV
;
139 flags
|= CACHE_FLAG_XATTRS
;
141 int r
= cache
.get(dpp
, name
, info
, flags
, cache_info
);
143 (!refresh_version
|| !info
.version
.compare(&(*refresh_version
)))) {
147 bufferlist
& bl
= info
.data
;
149 bufferlist::iterator i
= bl
.begin();
155 objv_tracker
->read_version
= info
.version
;
158 *attrs
= info
.xattrs
;
160 rgw_filter_attrset(info
.xattrs
, RGW_ATTR_PREFIX
, attrs
);
163 return obl
->length();
168 map
<string
, bufferlist
> unfiltered_attrset
;
169 r
= RGWSI_SysObj_Core::read(dpp
, obj_ctx
, read_state
, objv_tracker
,
171 (attrs
? &unfiltered_attrset
: nullptr),
172 true, /* cache unfiltered attrs */
176 if (r
== -ENOENT
) { // only update ENOENT, we'd rather retry other errors
178 cache
.put(dpp
, name
, info
, cache_info
);
183 if (obl
->length() == end
+ 1) {
184 /* in this case, most likely object contains more data, we can't cache it */
185 flags
&= ~CACHE_FLAG_DATA
;
188 bufferlist
& bl
= info
.data
;
190 bufferlist::iterator o
= obl
->begin();
197 info
.version
= objv_tracker
->read_version
;
200 info
.xattrs
= std::move(unfiltered_attrset
);
202 *attrs
= info
.xattrs
;
204 rgw_filter_attrset(info
.xattrs
, RGW_ATTR_PREFIX
, attrs
);
207 cache
.put(dpp
, name
, info
, cache_info
);
211 int RGWSI_SysObj_Cache::get_attr(const DoutPrefixProvider
*dpp
,
212 const rgw_raw_obj
& obj
,
213 const char *attr_name
,
220 normalize_pool_and_obj(obj
.pool
, obj
.oid
, pool
, oid
);
221 string name
= normal_name(pool
, oid
);
223 ObjectCacheInfo info
;
225 uint32_t flags
= CACHE_FLAG_XATTRS
;
227 int r
= cache
.get(dpp
, name
, info
, flags
, nullptr);
232 auto iter
= info
.xattrs
.find(attr_name
);
233 if (iter
== info
.xattrs
.end()) {
237 *dest
= iter
->second
;
238 return dest
->length();
239 } else if (r
== -ENODATA
) {
242 /* don't try to cache this one */
243 return RGWSI_SysObj_Core::get_attr(dpp
, obj
, attr_name
, dest
, y
);
246 int RGWSI_SysObj_Cache::set_attrs(const DoutPrefixProvider
*dpp
,
247 const rgw_raw_obj
& obj
,
248 map
<string
, bufferlist
>& attrs
,
249 map
<string
, bufferlist
> *rmattrs
,
250 RGWObjVersionTracker
*objv_tracker
,
255 normalize_pool_and_obj(obj
.pool
, obj
.oid
, pool
, oid
);
256 ObjectCacheInfo info
;
259 info
.rm_xattrs
= *rmattrs
;
262 info
.flags
= CACHE_FLAG_MODIFY_XATTRS
;
263 int ret
= RGWSI_SysObj_Core::set_attrs(dpp
, obj
, attrs
, rmattrs
, objv_tracker
, y
);
264 string name
= normal_name(pool
, oid
);
266 if (objv_tracker
&& objv_tracker
->read_version
.ver
) {
267 info
.version
= objv_tracker
->read_version
;
268 info
.flags
|= CACHE_FLAG_OBJV
;
270 cache
.put(dpp
, name
, info
, NULL
);
271 int r
= distribute_cache(dpp
, name
, obj
, info
, UPDATE_OBJ
, y
);
273 ldpp_dout(dpp
, 0) << "ERROR: failed to distribute cache for " << obj
<< dendl
;
275 cache
.invalidate_remove(dpp
, name
);
281 int RGWSI_SysObj_Cache::write(const DoutPrefixProvider
*dpp
,
282 const rgw_raw_obj
& obj
,
284 map
<std::string
, bufferlist
>& attrs
,
286 const bufferlist
& data
,
287 RGWObjVersionTracker
*objv_tracker
,
293 normalize_pool_and_obj(obj
.pool
, obj
.oid
, pool
, oid
);
294 ObjectCacheInfo info
;
298 info
.flags
= CACHE_FLAG_XATTRS
| CACHE_FLAG_DATA
| CACHE_FLAG_META
;
299 ceph::real_time result_mtime
;
300 int ret
= RGWSI_SysObj_Core::write(dpp
, obj
, &result_mtime
, attrs
,
302 objv_tracker
, set_mtime
, y
);
304 *pmtime
= result_mtime
;
306 if (objv_tracker
&& objv_tracker
->read_version
.ver
) {
307 info
.version
= objv_tracker
->read_version
;
308 info
.flags
|= CACHE_FLAG_OBJV
;
310 info
.meta
.mtime
= result_mtime
;
311 info
.meta
.size
= data
.length();
312 string name
= normal_name(pool
, oid
);
314 cache
.put(dpp
, name
, info
, NULL
);
315 int r
= distribute_cache(dpp
, name
, obj
, info
, UPDATE_OBJ
, y
);
317 ldpp_dout(dpp
, 0) << "ERROR: failed to distribute cache for " << obj
<< dendl
;
319 cache
.invalidate_remove(dpp
, name
);
325 int RGWSI_SysObj_Cache::write_data(const DoutPrefixProvider
*dpp
,
326 const rgw_raw_obj
& obj
,
327 const bufferlist
& data
,
329 RGWObjVersionTracker
*objv_tracker
,
334 normalize_pool_and_obj(obj
.pool
, obj
.oid
, pool
, oid
);
336 ObjectCacheInfo info
;
338 info
.meta
.size
= data
.length();
340 info
.flags
= CACHE_FLAG_DATA
;
342 int ret
= RGWSI_SysObj_Core::write_data(dpp
, obj
, data
, exclusive
, objv_tracker
, y
);
343 string name
= normal_name(pool
, oid
);
345 if (objv_tracker
&& objv_tracker
->read_version
.ver
) {
346 info
.version
= objv_tracker
->read_version
;
347 info
.flags
|= CACHE_FLAG_OBJV
;
349 cache
.put(dpp
, name
, info
, NULL
);
350 int r
= distribute_cache(dpp
, name
, obj
, info
, UPDATE_OBJ
, y
);
352 ldpp_dout(dpp
, 0) << "ERROR: failed to distribute cache for " << obj
<< dendl
;
354 cache
.invalidate_remove(dpp
, name
);
360 int RGWSI_SysObj_Cache::raw_stat(const DoutPrefixProvider
*dpp
, const rgw_raw_obj
& obj
, uint64_t *psize
, real_time
*pmtime
, uint64_t *pepoch
,
361 map
<string
, bufferlist
> *attrs
, bufferlist
*first_chunk
,
362 RGWObjVersionTracker
*objv_tracker
,
367 normalize_pool_and_obj(obj
.pool
, obj
.oid
, pool
, oid
);
369 string name
= normal_name(pool
, oid
);
375 ObjectCacheInfo info
;
376 uint32_t flags
= CACHE_FLAG_META
| CACHE_FLAG_XATTRS
;
378 flags
|= CACHE_FLAG_OBJV
;
379 int r
= cache
.get(dpp
, name
, info
, flags
, NULL
);
384 size
= info
.meta
.size
;
385 mtime
= info
.meta
.mtime
;
388 objv_tracker
->read_version
= info
.version
;
394 r
= RGWSI_SysObj_Core::raw_stat(dpp
, obj
, &size
, &mtime
, &epoch
, &info
.xattrs
,
395 first_chunk
, objv_tracker
, y
);
399 cache
.put(dpp
, name
, info
, NULL
);
405 info
.meta
.mtime
= mtime
;
406 info
.meta
.size
= size
;
407 info
.flags
= CACHE_FLAG_META
| CACHE_FLAG_XATTRS
;
409 info
.flags
|= CACHE_FLAG_OBJV
;
410 info
.version
= objv_tracker
->read_version
;
412 cache
.put(dpp
, name
, info
, NULL
);
421 *attrs
= info
.xattrs
;
425 int RGWSI_SysObj_Cache::distribute_cache(const DoutPrefixProvider
*dpp
,
426 const string
& normal_name
,
427 const rgw_raw_obj
& obj
,
428 ObjectCacheInfo
& obj_info
, int op
,
431 RGWCacheNotifyInfo info
;
433 info
.obj_info
= obj_info
;
435 return notify_svc
->distribute(dpp
, normal_name
, info
, y
);
438 int RGWSI_SysObj_Cache::watch_cb(const DoutPrefixProvider
*dpp
,
441 uint64_t notifier_id
,
444 RGWCacheNotifyInfo info
;
447 auto iter
= bl
.cbegin();
449 } catch (buffer::end_of_buffer
& err
) {
450 ldpp_dout(dpp
, 0) << "ERROR: got bad notification" << dendl
;
452 } catch (buffer::error
& err
) {
453 ldpp_dout(dpp
, 0) << "ERROR: buffer::error" << dendl
;
459 normalize_pool_and_obj(info
.obj
.pool
, info
.obj
.oid
, pool
, oid
);
460 string name
= normal_name(pool
, oid
);
464 cache
.put(dpp
, name
, info
.obj_info
, NULL
);
467 cache
.invalidate_remove(dpp
, name
);
470 ldpp_dout(dpp
, 0) << "WARNING: got unknown notification op: " << info
.op
<< dendl
;
477 void RGWSI_SysObj_Cache::set_enabled(bool status
)
479 cache
.set_enabled(status
);
482 bool RGWSI_SysObj_Cache::chain_cache_entry(const DoutPrefixProvider
*dpp
,
483 std::initializer_list
<rgw_cache_entry_info
*> cache_info_entries
,
484 RGWChainedCache::Entry
*chained_entry
)
486 return cache
.chain_cache_entry(dpp
, cache_info_entries
, chained_entry
);
489 void RGWSI_SysObj_Cache::register_chained_cache(RGWChainedCache
*cc
)
491 cache
.chain_cache(cc
);
494 void RGWSI_SysObj_Cache::unregister_chained_cache(RGWChainedCache
*cc
)
496 cache
.unchain_cache(cc
);
499 static void cache_list_dump_helper(Formatter
* f
,
500 const std::string
& name
,
501 const ceph::real_time mtime
,
502 const std::uint64_t size
)
504 f
->dump_string("name", name
);
505 f
->dump_string("mtime", ceph::to_iso_8601(mtime
));
506 f
->dump_unsigned("size", size
);
509 class RGWSI_SysObj_Cache_ASocketHook
: public AdminSocketHook
{
510 RGWSI_SysObj_Cache
*svc
;
512 static constexpr std::string_view admin_commands
[][2] = {
513 { "cache list name=filter,type=CephString,req=false",
514 "cache list [filter_str]: list object cache, possibly matching substrings" },
515 { "cache inspect name=target,type=CephString,req=true",
516 "cache inspect target: print cache element" },
517 { "cache erase name=target,type=CephString,req=true",
518 "cache erase target: erase element from cache" },
520 "cache zap: erase all elements from cache" }
524 RGWSI_SysObj_Cache_ASocketHook(RGWSI_SysObj_Cache
*_svc
) : svc(_svc
) {}
529 int call(std::string_view command
, const cmdmap_t
& cmdmap
,
532 bufferlist
& out
) override
;
535 int RGWSI_SysObj_Cache_ASocketHook::start()
537 auto admin_socket
= svc
->ctx()->get_admin_socket();
538 for (auto cmd
: admin_commands
) {
539 int r
= admin_socket
->register_command(cmd
[0], this, cmd
[1]);
541 ldout(svc
->ctx(), 0) << "ERROR: fail to register admin socket command (r=" << r
549 void RGWSI_SysObj_Cache_ASocketHook::shutdown()
551 auto admin_socket
= svc
->ctx()->get_admin_socket();
552 admin_socket
->unregister_commands(this);
555 int RGWSI_SysObj_Cache_ASocketHook::call(
556 std::string_view command
, const cmdmap_t
& cmdmap
,
561 if (command
== "cache list"sv
) {
562 std::optional
<std::string
> filter
;
563 if (auto i
= cmdmap
.find("filter"); i
!= cmdmap
.cend()) {
564 filter
= boost::get
<std::string
>(i
->second
);
566 f
->open_array_section("cache_entries");
567 svc
->asocket
.call_list(filter
, f
);
570 } else if (command
== "cache inspect"sv
) {
571 const auto& target
= boost::get
<std::string
>(cmdmap
.at("target"));
572 if (svc
->asocket
.call_inspect(target
, f
)) {
575 ss
<< "Unable to find entry "s
+ target
+ ".\n";
578 } else if (command
== "cache erase"sv
) {
579 const auto& target
= boost::get
<std::string
>(cmdmap
.at("target"));
580 if (svc
->asocket
.call_erase(target
)) {
583 ss
<< "Unable to find entry "s
+ target
+ ".\n";
586 } else if (command
== "cache zap"sv
) {
587 svc
->asocket
.call_zap();
593 RGWSI_SysObj_Cache::ASocketHandler::ASocketHandler(const DoutPrefixProvider
*_dpp
, RGWSI_SysObj_Cache
*_svc
) : dpp(_dpp
), svc(_svc
)
595 hook
.reset(new RGWSI_SysObj_Cache_ASocketHook(_svc
));
598 RGWSI_SysObj_Cache::ASocketHandler::~ASocketHandler()
602 int RGWSI_SysObj_Cache::ASocketHandler::start()
604 return hook
->start();
607 void RGWSI_SysObj_Cache::ASocketHandler::shutdown()
609 return hook
->shutdown();
612 void RGWSI_SysObj_Cache::ASocketHandler::call_list(const std::optional
<std::string
>& filter
, Formatter
* f
)
615 [&filter
, f
] (const string
& name
, const ObjectCacheEntry
& entry
) {
616 if (!filter
|| name
.find(*filter
) != name
.npos
) {
617 cache_list_dump_helper(f
, name
, entry
.info
.meta
.mtime
,
618 entry
.info
.meta
.size
);
623 int RGWSI_SysObj_Cache::ASocketHandler::call_inspect(const std::string
& target
, Formatter
* f
)
625 if (const auto entry
= svc
->cache
.get(dpp
, target
)) {
626 f
->open_object_section("cache_entry");
627 f
->dump_string("name", target
.c_str());
636 int RGWSI_SysObj_Cache::ASocketHandler::call_erase(const std::string
& target
)
638 return svc
->cache
.invalidate_remove(dpp
, target
);
641 int RGWSI_SysObj_Cache::ASocketHandler::call_zap()
643 svc
->cache
.invalidate_all();