]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_cache.h
update sources to 12.2.7
[ceph.git] / ceph / src / rgw / rgw_cache.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#ifndef CEPH_RGWCACHE_H
5#define CEPH_RGWCACHE_H
6
7#include "rgw_rados.h"
8#include <string>
9#include <map>
10#include "include/types.h"
11#include "include/utime.h"
12#include "include/assert.h"
13#include "common/RWLock.h"
14
15enum {
16 UPDATE_OBJ,
17 REMOVE_OBJ,
18};
19
20#define CACHE_FLAG_DATA 0x01
21#define CACHE_FLAG_XATTRS 0x02
22#define CACHE_FLAG_META 0x04
23#define CACHE_FLAG_MODIFY_XATTRS 0x08
24#define CACHE_FLAG_OBJV 0x10
25
26#define mydout(v) lsubdout(T::cct, rgw, v)
27
28struct ObjectMetaInfo {
29 uint64_t size;
30 real_time mtime;
31
32 ObjectMetaInfo() : size(0) {}
33
34 void encode(bufferlist& bl) const {
35 ENCODE_START(2, 2, bl);
36 ::encode(size, bl);
37 ::encode(mtime, bl);
38 ENCODE_FINISH(bl);
39 }
40 void decode(bufferlist::iterator& bl) {
41 DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl);
42 ::decode(size, bl);
43 ::decode(mtime, bl);
44 DECODE_FINISH(bl);
45 }
46 void dump(Formatter *f) const;
47 static void generate_test_instances(list<ObjectMetaInfo*>& o);
48};
49WRITE_CLASS_ENCODER(ObjectMetaInfo)
50
51struct ObjectCacheInfo {
b32b8144
FG
52 int status = 0;
53 uint32_t flags = 0;
54 uint64_t epoch = 0;
7c673cae
FG
55 bufferlist data;
56 map<string, bufferlist> xattrs;
57 map<string, bufferlist> rm_xattrs;
58 ObjectMetaInfo meta;
b32b8144 59 obj_version version = {};
28e407b8 60 ceph::coarse_mono_time time_added;
7c673cae 61
b32b8144 62 ObjectCacheInfo() = default;
7c673cae
FG
63
64 void encode(bufferlist& bl) const {
65 ENCODE_START(5, 3, bl);
66 ::encode(status, bl);
67 ::encode(flags, bl);
68 ::encode(data, bl);
69 ::encode(xattrs, bl);
70 ::encode(meta, bl);
71 ::encode(rm_xattrs, bl);
72 ::encode(epoch, bl);
73 ::encode(version, bl);
74 ENCODE_FINISH(bl);
75 }
76 void decode(bufferlist::iterator& bl) {
77 DECODE_START_LEGACY_COMPAT_LEN(5, 3, 3, bl);
78 ::decode(status, bl);
79 ::decode(flags, bl);
80 ::decode(data, bl);
81 ::decode(xattrs, bl);
82 ::decode(meta, bl);
83 if (struct_v >= 2)
84 ::decode(rm_xattrs, bl);
85 if (struct_v >= 4)
86 ::decode(epoch, bl);
87 if (struct_v >= 5)
88 ::decode(version, bl);
89 DECODE_FINISH(bl);
90 }
91 void dump(Formatter *f) const;
92 static void generate_test_instances(list<ObjectCacheInfo*>& o);
93};
94WRITE_CLASS_ENCODER(ObjectCacheInfo)
95
96struct RGWCacheNotifyInfo {
97 uint32_t op;
98 rgw_raw_obj obj;
99 ObjectCacheInfo obj_info;
100 off_t ofs;
101 string ns;
102
103 RGWCacheNotifyInfo() : op(0), ofs(0) {}
104
105 void encode(bufferlist& obl) const {
106 ENCODE_START(2, 2, obl);
107 ::encode(op, obl);
108 ::encode(obj, obl);
109 ::encode(obj_info, obl);
110 ::encode(ofs, obl);
111 ::encode(ns, obl);
112 ENCODE_FINISH(obl);
113 }
114 void decode(bufferlist::iterator& ibl) {
115 DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, ibl);
116 ::decode(op, ibl);
117 ::decode(obj, ibl);
118 ::decode(obj_info, ibl);
119 ::decode(ofs, ibl);
120 ::decode(ns, ibl);
121 DECODE_FINISH(ibl);
122 }
123 void dump(Formatter *f) const;
124 static void generate_test_instances(list<RGWCacheNotifyInfo*>& o);
125};
126WRITE_CLASS_ENCODER(RGWCacheNotifyInfo)
127
128struct ObjectCacheEntry {
129 ObjectCacheInfo info;
130 std::list<string>::iterator lru_iter;
131 uint64_t lru_promotion_ts;
132 uint64_t gen;
133 std::list<pair<RGWChainedCache *, string> > chained_entries;
134
135 ObjectCacheEntry() : lru_promotion_ts(0), gen(0) {}
136};
137
138class ObjectCache {
139 std::map<string, ObjectCacheEntry> cache_map;
140 std::list<string> lru;
141 unsigned long lru_size;
142 unsigned long lru_counter;
143 unsigned long lru_window;
144 RWLock lock;
145 CephContext *cct;
146
147 list<RGWChainedCache *> chained_cache;
148
149 bool enabled;
b32b8144 150 ceph::timespan expiry;
7c673cae 151
3a9019d9
FG
152 void touch_lru(const string& name, ObjectCacheEntry& entry, std::list<string>::iterator& lru_iter);
153 void remove_lru(const string& name, std::list<string>::iterator& lru_iter);
b32b8144 154 void invalidate_lru(ObjectCacheEntry& entry);
7c673cae
FG
155
156 void do_invalidate_all();
157public:
158 ObjectCache() : lru_size(0), lru_counter(0), lru_window(0), lock("ObjectCache"), cct(NULL), enabled(false) { }
3a9019d9
FG
159 int get(const std::string& name, ObjectCacheInfo& bl, uint32_t mask, rgw_cache_entry_info *cache_info);
160 boost::optional<ObjectCacheInfo> get(const std::string& name) {
161 boost::optional<ObjectCacheInfo> info{boost::in_place_init};
162 auto r = get(name, *info, 0, nullptr);
163 return r < 0 ? boost::none : info;
164 }
165
166 template<typename F>
167 void for_each(const F& f) {
168 RWLock::RLocker l(lock);
169 if (enabled) {
170 auto now = ceph::coarse_mono_clock::now();
171 for (const auto& kv : cache_map) {
172 if (expiry.count() && ((now - kv.second.info.time_added) < expiry)) {
173 f(kv.first, kv.second);
174 }
175 }
176 }
177 }
178
179 void put(const std::string& name, ObjectCacheInfo& bl, rgw_cache_entry_info *cache_info);
180 bool remove(const std::string& name);
7c673cae
FG
181 void set_ctx(CephContext *_cct) {
182 cct = _cct;
183 lru_window = cct->_conf->rgw_cache_lru_size / 2;
b32b8144
FG
184 expiry = std::chrono::seconds(cct->_conf->get_val<uint64_t>(
185 "rgw_cache_expiry_interval"));
7c673cae
FG
186 }
187 bool chain_cache_entry(list<rgw_cache_entry_info *>& cache_info_entries, RGWChainedCache::Entry *chained_entry);
188
189 void set_enabled(bool status);
190
191 void chain_cache(RGWChainedCache *cache);
192 void invalidate_all();
193};
194
195template <class T>
196class RGWCache : public T
197{
198 ObjectCache cache;
199
200 int list_objects_raw_init(rgw_pool& pool, RGWAccessHandle *handle) {
201 return T::list_objects_raw_init(pool, handle);
202 }
203 int list_objects_raw_next(rgw_bucket_dir_entry& obj, RGWAccessHandle *handle) {
204 return T::list_objects_raw_next(obj, handle);
205 }
206
207 string normal_name(rgw_pool& pool, const std::string& oid) {
208 std::string buf;
209 buf.reserve(pool.name.size() + pool.ns.size() + oid.size() + 2);
210 buf.append(pool.name).append("+").append(pool.ns).append("+").append(oid);
211 return buf;
212 }
213
214 void normalize_pool_and_obj(rgw_pool& src_pool, const string& src_obj, rgw_pool& dst_pool, string& dst_obj);
215 string normal_name(rgw_raw_obj& obj) {
216 return normal_name(obj.pool, obj.oid);
217 }
218
219 int init_rados() override {
220 int ret;
221 cache.set_ctx(T::cct);
222 ret = T::init_rados();
223 if (ret < 0)
224 return ret;
225
226 return 0;
227 }
228
229 bool need_watch_notify() override {
230 return true;
231 }
232
233 int distribute_cache(const string& normal_name, rgw_raw_obj& obj, ObjectCacheInfo& obj_info, int op);
234 int watch_cb(uint64_t notify_id,
235 uint64_t cookie,
236 uint64_t notifier_id,
237 bufferlist& bl) override;
238
239 void set_cache_enabled(bool state) override {
240 cache.set_enabled(state);
241 }
242public:
243 RGWCache() {}
244
245 void register_chained_cache(RGWChainedCache *cc) override {
246 cache.chain_cache(cc);
247 }
248
249 int system_obj_set_attrs(void *ctx, rgw_raw_obj& obj,
250 map<string, bufferlist>& attrs,
251 map<string, bufferlist>* rmattrs,
252 RGWObjVersionTracker *objv_tracker);
253 int put_system_obj_impl(rgw_raw_obj& obj, uint64_t size, real_time *mtime,
254 map<std::string, bufferlist>& attrs, int flags,
255 bufferlist& data,
256 RGWObjVersionTracker *objv_tracker,
257 real_time set_mtime) override;
258 int put_system_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& bl, off_t ofs, bool exclusive,
259 RGWObjVersionTracker *objv_tracker = nullptr) override;
260
261 int get_system_obj(RGWObjectCtx& obj_ctx, RGWRados::SystemObject::Read::GetObjState& read_state,
262 RGWObjVersionTracker *objv_tracker, rgw_raw_obj& obj,
263 bufferlist& bl, off_t ofs, off_t end,
264 map<string, bufferlist> *attrs,
b32b8144
FG
265 rgw_cache_entry_info *cache_info,
266 boost::optional<obj_version> refresh_version = boost::none) override;
7c673cae
FG
267
268 int raw_obj_stat(rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime, uint64_t *epoch, map<string, bufferlist> *attrs,
269 bufferlist *first_chunk, RGWObjVersionTracker *objv_tracker) override;
270
271 int delete_system_obj(rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker) override;
272
273 bool chain_cache_entry(list<rgw_cache_entry_info *>& cache_info_entries, RGWChainedCache::Entry *chained_entry) override {
274 return cache.chain_cache_entry(cache_info_entries, chained_entry);
275 }
3a9019d9
FG
276 void call_list(const boost::optional<std::string>& filter,
277 Formatter* format) override;
278 bool call_inspect(const std::string& target, Formatter* format) override;
279 bool call_erase(const std::string& target) override;
280 void call_zap() override;
7c673cae
FG
281};
282
283template <class T>
284void RGWCache<T>::normalize_pool_and_obj(rgw_pool& src_pool, const string& src_obj, rgw_pool& dst_pool, string& dst_obj)
285{
286 if (src_obj.size()) {
287 dst_pool = src_pool;
288 dst_obj = src_obj;
289 } else {
290 dst_pool = T::get_zone_params().domain_root;
291 dst_obj = src_pool.name;
292 }
293}
294
295template <class T>
296int RGWCache<T>::delete_system_obj(rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker)
297{
298 rgw_pool pool;
299 string oid;
300 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
301
302 string name = normal_name(obj);
303 cache.remove(name);
304
305 ObjectCacheInfo info;
306 distribute_cache(name, obj, info, REMOVE_OBJ);
307
308 return T::delete_system_obj(obj, objv_tracker);
309}
310
311template <class T>
312int RGWCache<T>::get_system_obj(RGWObjectCtx& obj_ctx, RGWRados::SystemObject::Read::GetObjState& read_state,
313 RGWObjVersionTracker *objv_tracker, rgw_raw_obj& obj,
314 bufferlist& obl, off_t ofs, off_t end,
315 map<string, bufferlist> *attrs,
b32b8144
FG
316 rgw_cache_entry_info *cache_info,
317 boost::optional<obj_version> refresh_version)
7c673cae
FG
318{
319 rgw_pool pool;
320 string oid;
321 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
322 if (ofs != 0)
323 return T::get_system_obj(obj_ctx, read_state, objv_tracker, obj, obl, ofs, end, attrs, cache_info);
324
325 string name = normal_name(obj.pool, oid);
326
327 ObjectCacheInfo info;
328
329 uint32_t flags = CACHE_FLAG_DATA;
330 if (objv_tracker)
331 flags |= CACHE_FLAG_OBJV;
332 if (attrs)
333 flags |= CACHE_FLAG_XATTRS;
b32b8144
FG
334
335 if ((cache.get(name, info, flags, cache_info) == 0) &&
336 (!refresh_version || !info.version.compare(&(*refresh_version)))) {
7c673cae
FG
337 if (info.status < 0)
338 return info.status;
339
340 bufferlist& bl = info.data;
341
342 bufferlist::iterator i = bl.begin();
343
344 obl.clear();
345
346 i.copy_all(obl);
347 if (objv_tracker)
348 objv_tracker->read_version = info.version;
349 if (attrs)
350 *attrs = info.xattrs;
351 return bl.length();
352 }
353 int r = T::get_system_obj(obj_ctx, read_state, objv_tracker, obj, obl, ofs, end, attrs, cache_info);
354 if (r < 0) {
355 if (r == -ENOENT) { // only update ENOENT, we'd rather retry other errors
356 info.status = r;
357 cache.put(name, info, cache_info);
358 }
359 return r;
360 }
361
362 if (obl.length() == end + 1) {
363 /* in this case, most likely object contains more data, we can't cache it */
364 return r;
365 }
366
367 bufferptr p(r);
368 bufferlist& bl = info.data;
369 bl.clear();
370 bufferlist::iterator o = obl.begin();
371 o.copy_all(bl);
372 info.status = 0;
373 info.flags = flags;
374 if (objv_tracker) {
375 info.version = objv_tracker->read_version;
376 }
377 if (attrs) {
378 info.xattrs = *attrs;
379 }
380 cache.put(name, info, cache_info);
381 return r;
382}
383
384template <class T>
385int RGWCache<T>::system_obj_set_attrs(void *ctx, rgw_raw_obj& obj,
386 map<string, bufferlist>& attrs,
387 map<string, bufferlist>* rmattrs,
388 RGWObjVersionTracker *objv_tracker)
389{
390 rgw_pool pool;
391 string oid;
392 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
393 ObjectCacheInfo info;
394 info.xattrs = attrs;
395 if (rmattrs)
396 info.rm_xattrs = *rmattrs;
397 info.status = 0;
398 info.flags = CACHE_FLAG_MODIFY_XATTRS;
399 if (objv_tracker) {
400 info.version = objv_tracker->write_version;
401 info.flags |= CACHE_FLAG_OBJV;
402 }
403 int ret = T::system_obj_set_attrs(ctx, obj, attrs, rmattrs, objv_tracker);
404 string name = normal_name(pool, oid);
405 if (ret >= 0) {
406 cache.put(name, info, NULL);
407 int r = distribute_cache(name, obj, info, UPDATE_OBJ);
408 if (r < 0)
409 mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
410 } else {
411 cache.remove(name);
412 }
413
414 return ret;
415}
416
417template <class T>
418int RGWCache<T>::put_system_obj_impl(rgw_raw_obj& obj, uint64_t size, real_time *mtime,
419 map<std::string, bufferlist>& attrs, int flags,
420 bufferlist& data,
421 RGWObjVersionTracker *objv_tracker,
422 real_time set_mtime)
423{
424 rgw_pool pool;
425 string oid;
426 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
427 ObjectCacheInfo info;
428 info.xattrs = attrs;
429 info.status = 0;
430 info.data = data;
431 info.flags = CACHE_FLAG_XATTRS | CACHE_FLAG_DATA | CACHE_FLAG_META;
432 if (objv_tracker) {
433 info.version = objv_tracker->write_version;
434 info.flags |= CACHE_FLAG_OBJV;
435 }
436 ceph::real_time result_mtime;
437 int ret = T::put_system_obj_impl(obj, size, &result_mtime, attrs, flags, data,
438 objv_tracker, set_mtime);
439 if (mtime) {
440 *mtime = result_mtime;
441 }
442 info.meta.mtime = result_mtime;
443 info.meta.size = size;
444 string name = normal_name(pool, oid);
445 if (ret >= 0) {
446 cache.put(name, info, NULL);
b32b8144
FG
447 // Only distribute the cache information if we did not just create
448 // the object with the exclusive flag. Note: PUT_OBJ_EXCL implies
449 // PUT_OBJ_CREATE. Generally speaking, when successfully creating
450 // a system object with the exclusive flag it is not necessary to
451 // call distribute_cache, as a) it's unclear whether other RGWs
452 // will need that system object in the near-term and b) it
453 // generates additional network traffic.
454 if (!(flags & PUT_OBJ_EXCL)) {
455 int r = distribute_cache(name, obj, info, UPDATE_OBJ);
456 if (r < 0)
457 mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
458 }
7c673cae
FG
459 } else {
460 cache.remove(name);
461 }
462
463 return ret;
464}
465
466template <class T>
467int RGWCache<T>::put_system_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& data, off_t ofs, bool exclusive,
468 RGWObjVersionTracker *objv_tracker)
469{
470 rgw_pool pool;
471 string oid;
472 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
473 ObjectCacheInfo info;
474 bool cacheable = false;
475 if ((ofs == 0) || (ofs == -1)) {
476 cacheable = true;
477 info.data = data;
478 info.meta.size = data.length();
479 info.status = 0;
480 info.flags = CACHE_FLAG_DATA;
481 }
482 if (objv_tracker) {
483 info.version = objv_tracker->write_version;
484 info.flags |= CACHE_FLAG_OBJV;
485 }
486 int ret = T::put_system_obj_data(ctx, obj, data, ofs, exclusive, objv_tracker);
487 if (cacheable) {
488 string name = normal_name(pool, oid);
489 if (ret >= 0) {
490 cache.put(name, info, NULL);
491 int r = distribute_cache(name, obj, info, UPDATE_OBJ);
492 if (r < 0)
493 mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
494 } else {
495 cache.remove(name);
496 }
497 }
498
499 return ret;
500}
501
502template <class T>
503int RGWCache<T>::raw_obj_stat(rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime,
504 uint64_t *pepoch, map<string, bufferlist> *attrs,
505 bufferlist *first_chunk, RGWObjVersionTracker *objv_tracker)
506{
507 rgw_pool pool;
508 string oid;
509 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
510
511 string name = normal_name(pool, oid);
512
513 uint64_t size;
514 real_time mtime;
515 uint64_t epoch;
516
517 ObjectCacheInfo info;
518 uint32_t flags = CACHE_FLAG_META | CACHE_FLAG_XATTRS;
519 if (objv_tracker)
520 flags |= CACHE_FLAG_OBJV;
521 int r = cache.get(name, info, flags, NULL);
522 if (r == 0) {
523 if (info.status < 0)
524 return info.status;
525
526 size = info.meta.size;
527 mtime = info.meta.mtime;
528 epoch = info.epoch;
529 if (objv_tracker)
530 objv_tracker->read_version = info.version;
531 goto done;
532 }
533 r = T::raw_obj_stat(obj, &size, &mtime, &epoch, &info.xattrs, first_chunk, objv_tracker);
534 if (r < 0) {
535 if (r == -ENOENT) {
536 info.status = r;
537 cache.put(name, info, NULL);
538 }
539 return r;
540 }
541 info.status = 0;
542 info.epoch = epoch;
543 info.meta.mtime = mtime;
544 info.meta.size = size;
545 info.flags = CACHE_FLAG_META | CACHE_FLAG_XATTRS;
546 if (objv_tracker) {
547 info.flags |= CACHE_FLAG_OBJV;
548 info.version = objv_tracker->read_version;
549 }
550 cache.put(name, info, NULL);
551done:
552 if (psize)
553 *psize = size;
554 if (pmtime)
555 *pmtime = mtime;
556 if (pepoch)
557 *pepoch = epoch;
558 if (attrs)
559 *attrs = info.xattrs;
560 return 0;
561}
562
563template <class T>
564int RGWCache<T>::distribute_cache(const string& normal_name, rgw_raw_obj& obj, ObjectCacheInfo& obj_info, int op)
565{
566 RGWCacheNotifyInfo info;
567
568 info.op = op;
569
570 info.obj_info = obj_info;
571 info.obj = obj;
572 bufferlist bl;
573 ::encode(info, bl);
574 return T::distribute(normal_name, bl);
575}
576
577template <class T>
578int RGWCache<T>::watch_cb(uint64_t notify_id,
579 uint64_t cookie,
580 uint64_t notifier_id,
581 bufferlist& bl)
582{
583 RGWCacheNotifyInfo info;
584
585 try {
586 bufferlist::iterator iter = bl.begin();
587 ::decode(info, iter);
588 } catch (buffer::end_of_buffer& err) {
589 mydout(0) << "ERROR: got bad notification" << dendl;
590 return -EIO;
591 } catch (buffer::error& err) {
592 mydout(0) << "ERROR: buffer::error" << dendl;
593 return -EIO;
594 }
595
596 rgw_pool pool;
597 string oid;
598 normalize_pool_and_obj(info.obj.pool, info.obj.oid, pool, oid);
599 string name = normal_name(pool, oid);
600
601 switch (info.op) {
602 case UPDATE_OBJ:
603 cache.put(name, info.obj_info, NULL);
604 break;
605 case REMOVE_OBJ:
606 cache.remove(name);
607 break;
608 default:
609 mydout(0) << "WARNING: got unknown notification op: " << info.op << dendl;
610 return -EINVAL;
611 }
612
613 return 0;
614}
615
3a9019d9
FG
616template<typename T>
617void RGWCache<T>::call_list(const boost::optional<std::string>& filter,
618 Formatter* f)
619{
620 cache.for_each(
621 [this, &filter, f] (const string& name, const ObjectCacheEntry& entry) {
622 if (!filter || name.find(*filter) != name.npos) {
623 T::cache_list_dump_helper(f, name, entry.info.meta.mtime,
624 entry.info.meta.size);
625 }
626 });
627}
628
629template<typename T>
630bool RGWCache<T>::call_inspect(const std::string& target, Formatter* f)
631{
632 if (const auto entry = cache.get(target)) {
633 f->open_object_section("cache_entry");
634 f->dump_string("name", target.c_str());
635 entry->dump(f);
636 f->close_section();
637 return true;
638 } else {
639 return false;
640 }
641}
642
643template<typename T>
644bool RGWCache<T>::call_erase(const std::string& target)
645{
646 return cache.remove(target);
647}
648
649template<typename T>
650void RGWCache<T>::call_zap()
651{
652 cache.invalidate_all();
653}
7c673cae 654#endif