1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef CEPH_RGWCACHE_H
5 #define CEPH_RGWCACHE_H
10 #include "include/types.h"
11 #include "include/utime.h"
12 #include "include/assert.h"
13 #include "common/RWLock.h"
20 #define CACHE_FLAG_DATA 0x01
21 #define CACHE_FLAG_XATTRS 0x02
22 #define CACHE_FLAG_META 0x04
23 #define CACHE_FLAG_MODIFY_XATTRS 0x08
24 #define CACHE_FLAG_OBJV 0x10
26 #define mydout(v) lsubdout(T::cct, rgw, v)
28 struct ObjectMetaInfo
{
32 ObjectMetaInfo() : size(0) {}
34 void encode(bufferlist
& bl
) const {
35 ENCODE_START(2, 2, bl
);
40 void decode(bufferlist::iterator
& bl
) {
41 DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl
);
46 void dump(Formatter
*f
) const;
47 static void generate_test_instances(list
<ObjectMetaInfo
*>& o
);
49 WRITE_CLASS_ENCODER(ObjectMetaInfo
)
51 struct ObjectCacheInfo
{
56 map
<string
, bufferlist
> xattrs
;
57 map
<string
, bufferlist
> rm_xattrs
;
59 obj_version version
= {};
60 ceph::coarse_mono_time time_added
= ceph::coarse_mono_clock::now();
62 ObjectCacheInfo() = default;
64 void encode(bufferlist
& bl
) const {
65 ENCODE_START(5, 3, bl
);
71 ::encode(rm_xattrs
, bl
);
73 ::encode(version
, bl
);
76 void decode(bufferlist::iterator
& bl
) {
77 DECODE_START_LEGACY_COMPAT_LEN(5, 3, 3, bl
);
84 ::decode(rm_xattrs
, bl
);
88 ::decode(version
, bl
);
91 void dump(Formatter
*f
) const;
92 static void generate_test_instances(list
<ObjectCacheInfo
*>& o
);
94 WRITE_CLASS_ENCODER(ObjectCacheInfo
)
96 struct RGWCacheNotifyInfo
{
99 ObjectCacheInfo obj_info
;
103 RGWCacheNotifyInfo() : op(0), ofs(0) {}
105 void encode(bufferlist
& obl
) const {
106 ENCODE_START(2, 2, obl
);
109 ::encode(obj_info
, obl
);
114 void decode(bufferlist::iterator
& ibl
) {
115 DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, ibl
);
118 ::decode(obj_info
, ibl
);
123 void dump(Formatter
*f
) const;
124 static void generate_test_instances(list
<RGWCacheNotifyInfo
*>& o
);
126 WRITE_CLASS_ENCODER(RGWCacheNotifyInfo
)
128 struct ObjectCacheEntry
{
129 ObjectCacheInfo info
;
130 std::list
<string
>::iterator lru_iter
;
131 uint64_t lru_promotion_ts
;
133 std::list
<pair
<RGWChainedCache
*, string
> > chained_entries
;
135 ObjectCacheEntry() : lru_promotion_ts(0), gen(0) {}
139 std::map
<string
, ObjectCacheEntry
> cache_map
;
140 std::list
<string
> lru
;
141 unsigned long lru_size
;
142 unsigned long lru_counter
;
143 unsigned long lru_window
;
147 list
<RGWChainedCache
*> chained_cache
;
150 ceph::timespan expiry
;
152 void touch_lru(const string
& name
, ObjectCacheEntry
& entry
, std::list
<string
>::iterator
& lru_iter
);
153 void remove_lru(const string
& name
, std::list
<string
>::iterator
& lru_iter
);
154 void invalidate_lru(ObjectCacheEntry
& entry
);
156 void do_invalidate_all();
158 ObjectCache() : lru_size(0), lru_counter(0), lru_window(0), lock("ObjectCache"), cct(NULL
), enabled(false) { }
159 int get(const std::string
& name
, ObjectCacheInfo
& bl
, uint32_t mask
, rgw_cache_entry_info
*cache_info
);
160 boost::optional
<ObjectCacheInfo
> get(const std::string
& name
) {
161 boost::optional
<ObjectCacheInfo
> info
{boost::in_place_init
};
162 auto r
= get(name
, *info
, 0, nullptr);
163 return r
< 0 ? boost::none
: info
;
167 void for_each(const F
& f
) {
168 RWLock::RLocker
l(lock
);
170 auto now
= ceph::coarse_mono_clock::now();
171 for (const auto& kv
: cache_map
) {
172 if (expiry
.count() && ((now
- kv
.second
.info
.time_added
) < expiry
)) {
173 f(kv
.first
, kv
.second
);
179 void put(const std::string
& name
, ObjectCacheInfo
& bl
, rgw_cache_entry_info
*cache_info
);
180 bool remove(const std::string
& name
);
181 void set_ctx(CephContext
*_cct
) {
183 lru_window
= cct
->_conf
->rgw_cache_lru_size
/ 2;
184 expiry
= std::chrono::seconds(cct
->_conf
->get_val
<uint64_t>(
185 "rgw_cache_expiry_interval"));
187 bool chain_cache_entry(list
<rgw_cache_entry_info
*>& cache_info_entries
, RGWChainedCache::Entry
*chained_entry
);
189 void set_enabled(bool status
);
191 void chain_cache(RGWChainedCache
*cache
);
192 void invalidate_all();
196 class RGWCache
: public T
200 int list_objects_raw_init(rgw_pool
& pool
, RGWAccessHandle
*handle
) {
201 return T::list_objects_raw_init(pool
, handle
);
203 int list_objects_raw_next(rgw_bucket_dir_entry
& obj
, RGWAccessHandle
*handle
) {
204 return T::list_objects_raw_next(obj
, handle
);
207 string
normal_name(rgw_pool
& pool
, const std::string
& oid
) {
209 buf
.reserve(pool
.name
.size() + pool
.ns
.size() + oid
.size() + 2);
210 buf
.append(pool
.name
).append("+").append(pool
.ns
).append("+").append(oid
);
214 void normalize_pool_and_obj(rgw_pool
& src_pool
, const string
& src_obj
, rgw_pool
& dst_pool
, string
& dst_obj
);
215 string
normal_name(rgw_raw_obj
& obj
) {
216 return normal_name(obj
.pool
, obj
.oid
);
219 int init_rados() override
{
221 cache
.set_ctx(T::cct
);
222 ret
= T::init_rados();
229 bool need_watch_notify() override
{
233 int distribute_cache(const string
& normal_name
, rgw_raw_obj
& obj
, ObjectCacheInfo
& obj_info
, int op
);
234 int watch_cb(uint64_t notify_id
,
236 uint64_t notifier_id
,
237 bufferlist
& bl
) override
;
239 void set_cache_enabled(bool state
) override
{
240 cache
.set_enabled(state
);
245 void register_chained_cache(RGWChainedCache
*cc
) override
{
246 cache
.chain_cache(cc
);
249 int system_obj_set_attrs(void *ctx
, rgw_raw_obj
& obj
,
250 map
<string
, bufferlist
>& attrs
,
251 map
<string
, bufferlist
>* rmattrs
,
252 RGWObjVersionTracker
*objv_tracker
);
253 int put_system_obj_impl(rgw_raw_obj
& obj
, uint64_t size
, real_time
*mtime
,
254 map
<std::string
, bufferlist
>& attrs
, int flags
,
256 RGWObjVersionTracker
*objv_tracker
,
257 real_time set_mtime
) override
;
258 int put_system_obj_data(void *ctx
, rgw_raw_obj
& obj
, bufferlist
& bl
, off_t ofs
, bool exclusive
,
259 RGWObjVersionTracker
*objv_tracker
= nullptr) override
;
261 int get_system_obj(RGWObjectCtx
& obj_ctx
, RGWRados::SystemObject::Read::GetObjState
& read_state
,
262 RGWObjVersionTracker
*objv_tracker
, rgw_raw_obj
& obj
,
263 bufferlist
& bl
, off_t ofs
, off_t end
,
264 map
<string
, bufferlist
> *attrs
,
265 rgw_cache_entry_info
*cache_info
,
266 boost::optional
<obj_version
> refresh_version
= boost::none
) override
;
268 int raw_obj_stat(rgw_raw_obj
& obj
, uint64_t *psize
, real_time
*pmtime
, uint64_t *epoch
, map
<string
, bufferlist
> *attrs
,
269 bufferlist
*first_chunk
, RGWObjVersionTracker
*objv_tracker
) override
;
271 int delete_system_obj(rgw_raw_obj
& obj
, RGWObjVersionTracker
*objv_tracker
) override
;
273 bool chain_cache_entry(list
<rgw_cache_entry_info
*>& cache_info_entries
, RGWChainedCache::Entry
*chained_entry
) override
{
274 return cache
.chain_cache_entry(cache_info_entries
, chained_entry
);
276 void call_list(const boost::optional
<std::string
>& filter
,
277 Formatter
* format
) override
;
278 bool call_inspect(const std::string
& target
, Formatter
* format
) override
;
279 bool call_erase(const std::string
& target
) override
;
280 void call_zap() override
;
284 void RGWCache
<T
>::normalize_pool_and_obj(rgw_pool
& src_pool
, const string
& src_obj
, rgw_pool
& dst_pool
, string
& dst_obj
)
286 if (src_obj
.size()) {
290 dst_pool
= T::get_zone_params().domain_root
;
291 dst_obj
= src_pool
.name
;
296 int RGWCache
<T
>::delete_system_obj(rgw_raw_obj
& obj
, RGWObjVersionTracker
*objv_tracker
)
300 normalize_pool_and_obj(obj
.pool
, obj
.oid
, pool
, oid
);
302 string name
= normal_name(obj
);
305 ObjectCacheInfo info
;
306 distribute_cache(name
, obj
, info
, REMOVE_OBJ
);
308 return T::delete_system_obj(obj
, objv_tracker
);
312 int RGWCache
<T
>::get_system_obj(RGWObjectCtx
& obj_ctx
, RGWRados::SystemObject::Read::GetObjState
& read_state
,
313 RGWObjVersionTracker
*objv_tracker
, rgw_raw_obj
& obj
,
314 bufferlist
& obl
, off_t ofs
, off_t end
,
315 map
<string
, bufferlist
> *attrs
,
316 rgw_cache_entry_info
*cache_info
,
317 boost::optional
<obj_version
> refresh_version
)
321 normalize_pool_and_obj(obj
.pool
, obj
.oid
, pool
, oid
);
323 return T::get_system_obj(obj_ctx
, read_state
, objv_tracker
, obj
, obl
, ofs
, end
, attrs
, cache_info
);
325 string name
= normal_name(obj
.pool
, oid
);
327 ObjectCacheInfo info
;
329 uint32_t flags
= CACHE_FLAG_DATA
;
331 flags
|= CACHE_FLAG_OBJV
;
333 flags
|= CACHE_FLAG_XATTRS
;
335 if ((cache
.get(name
, info
, flags
, cache_info
) == 0) &&
336 (!refresh_version
|| !info
.version
.compare(&(*refresh_version
)))) {
340 bufferlist
& bl
= info
.data
;
342 bufferlist::iterator i
= bl
.begin();
348 objv_tracker
->read_version
= info
.version
;
350 *attrs
= info
.xattrs
;
353 int r
= T::get_system_obj(obj_ctx
, read_state
, objv_tracker
, obj
, obl
, ofs
, end
, attrs
, cache_info
);
355 if (r
== -ENOENT
) { // only update ENOENT, we'd rather retry other errors
357 cache
.put(name
, info
, cache_info
);
362 if (obl
.length() == end
+ 1) {
363 /* in this case, most likely object contains more data, we can't cache it */
368 bufferlist
& bl
= info
.data
;
370 bufferlist::iterator o
= obl
.begin();
375 info
.version
= objv_tracker
->read_version
;
378 info
.xattrs
= *attrs
;
380 cache
.put(name
, info
, cache_info
);
385 int RGWCache
<T
>::system_obj_set_attrs(void *ctx
, rgw_raw_obj
& obj
,
386 map
<string
, bufferlist
>& attrs
,
387 map
<string
, bufferlist
>* rmattrs
,
388 RGWObjVersionTracker
*objv_tracker
)
392 normalize_pool_and_obj(obj
.pool
, obj
.oid
, pool
, oid
);
393 ObjectCacheInfo info
;
396 info
.rm_xattrs
= *rmattrs
;
398 info
.flags
= CACHE_FLAG_MODIFY_XATTRS
;
400 info
.version
= objv_tracker
->write_version
;
401 info
.flags
|= CACHE_FLAG_OBJV
;
403 int ret
= T::system_obj_set_attrs(ctx
, obj
, attrs
, rmattrs
, objv_tracker
);
404 string name
= normal_name(pool
, oid
);
406 cache
.put(name
, info
, NULL
);
407 int r
= distribute_cache(name
, obj
, info
, UPDATE_OBJ
);
409 mydout(0) << "ERROR: failed to distribute cache for " << obj
<< dendl
;
418 int RGWCache
<T
>::put_system_obj_impl(rgw_raw_obj
& obj
, uint64_t size
, real_time
*mtime
,
419 map
<std::string
, bufferlist
>& attrs
, int flags
,
421 RGWObjVersionTracker
*objv_tracker
,
426 normalize_pool_and_obj(obj
.pool
, obj
.oid
, pool
, oid
);
427 ObjectCacheInfo info
;
431 info
.flags
= CACHE_FLAG_XATTRS
| CACHE_FLAG_DATA
| CACHE_FLAG_META
;
433 info
.version
= objv_tracker
->write_version
;
434 info
.flags
|= CACHE_FLAG_OBJV
;
436 ceph::real_time result_mtime
;
437 int ret
= T::put_system_obj_impl(obj
, size
, &result_mtime
, attrs
, flags
, data
,
438 objv_tracker
, set_mtime
);
440 *mtime
= result_mtime
;
442 info
.meta
.mtime
= result_mtime
;
443 info
.meta
.size
= size
;
444 string name
= normal_name(pool
, oid
);
446 cache
.put(name
, info
, NULL
);
447 // Only distribute the cache information if we did not just create
448 // the object with the exclusive flag. Note: PUT_OBJ_EXCL implies
449 // PUT_OBJ_CREATE. Generally speaking, when successfully creating
450 // a system object with the exclusive flag it is not necessary to
451 // call distribute_cache, as a) it's unclear whether other RGWs
452 // will need that system object in the near-term and b) it
453 // generates additional network traffic.
454 if (!(flags
& PUT_OBJ_EXCL
)) {
455 int r
= distribute_cache(name
, obj
, info
, UPDATE_OBJ
);
457 mydout(0) << "ERROR: failed to distribute cache for " << obj
<< dendl
;
467 int RGWCache
<T
>::put_system_obj_data(void *ctx
, rgw_raw_obj
& obj
, bufferlist
& data
, off_t ofs
, bool exclusive
,
468 RGWObjVersionTracker
*objv_tracker
)
472 normalize_pool_and_obj(obj
.pool
, obj
.oid
, pool
, oid
);
473 ObjectCacheInfo info
;
474 bool cacheable
= false;
475 if ((ofs
== 0) || (ofs
== -1)) {
478 info
.meta
.size
= data
.length();
480 info
.flags
= CACHE_FLAG_DATA
;
483 info
.version
= objv_tracker
->write_version
;
484 info
.flags
|= CACHE_FLAG_OBJV
;
486 int ret
= T::put_system_obj_data(ctx
, obj
, data
, ofs
, exclusive
, objv_tracker
);
488 string name
= normal_name(pool
, oid
);
490 cache
.put(name
, info
, NULL
);
491 int r
= distribute_cache(name
, obj
, info
, UPDATE_OBJ
);
493 mydout(0) << "ERROR: failed to distribute cache for " << obj
<< dendl
;
503 int RGWCache
<T
>::raw_obj_stat(rgw_raw_obj
& obj
, uint64_t *psize
, real_time
*pmtime
,
504 uint64_t *pepoch
, map
<string
, bufferlist
> *attrs
,
505 bufferlist
*first_chunk
, RGWObjVersionTracker
*objv_tracker
)
509 normalize_pool_and_obj(obj
.pool
, obj
.oid
, pool
, oid
);
511 string name
= normal_name(pool
, oid
);
517 ObjectCacheInfo info
;
518 uint32_t flags
= CACHE_FLAG_META
| CACHE_FLAG_XATTRS
;
520 flags
|= CACHE_FLAG_OBJV
;
521 int r
= cache
.get(name
, info
, flags
, NULL
);
526 size
= info
.meta
.size
;
527 mtime
= info
.meta
.mtime
;
530 objv_tracker
->read_version
= info
.version
;
533 r
= T::raw_obj_stat(obj
, &size
, &mtime
, &epoch
, &info
.xattrs
, first_chunk
, objv_tracker
);
537 cache
.put(name
, info
, NULL
);
543 info
.meta
.mtime
= mtime
;
544 info
.meta
.size
= size
;
545 info
.flags
= CACHE_FLAG_META
| CACHE_FLAG_XATTRS
;
547 info
.flags
|= CACHE_FLAG_OBJV
;
548 info
.version
= objv_tracker
->read_version
;
550 cache
.put(name
, info
, NULL
);
559 *attrs
= info
.xattrs
;
564 int RGWCache
<T
>::distribute_cache(const string
& normal_name
, rgw_raw_obj
& obj
, ObjectCacheInfo
& obj_info
, int op
)
566 RGWCacheNotifyInfo info
;
570 info
.obj_info
= obj_info
;
574 return T::distribute(normal_name
, bl
);
578 int RGWCache
<T
>::watch_cb(uint64_t notify_id
,
580 uint64_t notifier_id
,
583 RGWCacheNotifyInfo info
;
586 bufferlist::iterator iter
= bl
.begin();
587 ::decode(info
, iter
);
588 } catch (buffer::end_of_buffer
& err
) {
589 mydout(0) << "ERROR: got bad notification" << dendl
;
591 } catch (buffer::error
& err
) {
592 mydout(0) << "ERROR: buffer::error" << dendl
;
598 normalize_pool_and_obj(info
.obj
.pool
, info
.obj
.oid
, pool
, oid
);
599 string name
= normal_name(pool
, oid
);
603 cache
.put(name
, info
.obj_info
, NULL
);
609 mydout(0) << "WARNING: got unknown notification op: " << info
.op
<< dendl
;
617 void RGWCache
<T
>::call_list(const boost::optional
<std::string
>& filter
,
621 [this, &filter
, f
] (const string
& name
, const ObjectCacheEntry
& entry
) {
622 if (!filter
|| name
.find(*filter
) != name
.npos
) {
623 T::cache_list_dump_helper(f
, name
, entry
.info
.meta
.mtime
,
624 entry
.info
.meta
.size
);
630 bool RGWCache
<T
>::call_inspect(const std::string
& target
, Formatter
* f
)
632 if (const auto entry
= cache
.get(target
)) {
633 f
->open_object_section("cache_entry");
634 f
->dump_string("name", target
.c_str());
644 bool RGWCache
<T
>::call_erase(const std::string
& target
)
646 return cache
.remove(target
);
650 void RGWCache
<T
>::call_zap()
652 cache
.invalidate_all();