]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_cache.h
bump version to 12.2.3-pve1
[ceph.git] / ceph / src / rgw / rgw_cache.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#ifndef CEPH_RGWCACHE_H
5#define CEPH_RGWCACHE_H
6
7#include "rgw_rados.h"
8#include <string>
9#include <map>
10#include "include/types.h"
11#include "include/utime.h"
12#include "include/assert.h"
13#include "common/RWLock.h"
14
15enum {
16 UPDATE_OBJ,
17 REMOVE_OBJ,
18};
19
20#define CACHE_FLAG_DATA 0x01
21#define CACHE_FLAG_XATTRS 0x02
22#define CACHE_FLAG_META 0x04
23#define CACHE_FLAG_MODIFY_XATTRS 0x08
24#define CACHE_FLAG_OBJV 0x10
25
26#define mydout(v) lsubdout(T::cct, rgw, v)
27
28struct ObjectMetaInfo {
29 uint64_t size;
30 real_time mtime;
31
32 ObjectMetaInfo() : size(0) {}
33
34 void encode(bufferlist& bl) const {
35 ENCODE_START(2, 2, bl);
36 ::encode(size, bl);
37 ::encode(mtime, bl);
38 ENCODE_FINISH(bl);
39 }
40 void decode(bufferlist::iterator& bl) {
41 DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl);
42 ::decode(size, bl);
43 ::decode(mtime, bl);
44 DECODE_FINISH(bl);
45 }
46 void dump(Formatter *f) const;
47 static void generate_test_instances(list<ObjectMetaInfo*>& o);
48};
49WRITE_CLASS_ENCODER(ObjectMetaInfo)
50
51struct ObjectCacheInfo {
b32b8144
FG
52 int status = 0;
53 uint32_t flags = 0;
54 uint64_t epoch = 0;
7c673cae
FG
55 bufferlist data;
56 map<string, bufferlist> xattrs;
57 map<string, bufferlist> rm_xattrs;
58 ObjectMetaInfo meta;
b32b8144
FG
59 obj_version version = {};
60 ceph::coarse_mono_time time_added = ceph::coarse_mono_clock::now();
7c673cae 61
b32b8144 62 ObjectCacheInfo() = default;
7c673cae
FG
63
64 void encode(bufferlist& bl) const {
65 ENCODE_START(5, 3, bl);
66 ::encode(status, bl);
67 ::encode(flags, bl);
68 ::encode(data, bl);
69 ::encode(xattrs, bl);
70 ::encode(meta, bl);
71 ::encode(rm_xattrs, bl);
72 ::encode(epoch, bl);
73 ::encode(version, bl);
74 ENCODE_FINISH(bl);
75 }
76 void decode(bufferlist::iterator& bl) {
77 DECODE_START_LEGACY_COMPAT_LEN(5, 3, 3, bl);
78 ::decode(status, bl);
79 ::decode(flags, bl);
80 ::decode(data, bl);
81 ::decode(xattrs, bl);
82 ::decode(meta, bl);
83 if (struct_v >= 2)
84 ::decode(rm_xattrs, bl);
85 if (struct_v >= 4)
86 ::decode(epoch, bl);
87 if (struct_v >= 5)
88 ::decode(version, bl);
89 DECODE_FINISH(bl);
90 }
91 void dump(Formatter *f) const;
92 static void generate_test_instances(list<ObjectCacheInfo*>& o);
93};
94WRITE_CLASS_ENCODER(ObjectCacheInfo)
95
96struct RGWCacheNotifyInfo {
97 uint32_t op;
98 rgw_raw_obj obj;
99 ObjectCacheInfo obj_info;
100 off_t ofs;
101 string ns;
102
103 RGWCacheNotifyInfo() : op(0), ofs(0) {}
104
105 void encode(bufferlist& obl) const {
106 ENCODE_START(2, 2, obl);
107 ::encode(op, obl);
108 ::encode(obj, obl);
109 ::encode(obj_info, obl);
110 ::encode(ofs, obl);
111 ::encode(ns, obl);
112 ENCODE_FINISH(obl);
113 }
114 void decode(bufferlist::iterator& ibl) {
115 DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, ibl);
116 ::decode(op, ibl);
117 ::decode(obj, ibl);
118 ::decode(obj_info, ibl);
119 ::decode(ofs, ibl);
120 ::decode(ns, ibl);
121 DECODE_FINISH(ibl);
122 }
123 void dump(Formatter *f) const;
124 static void generate_test_instances(list<RGWCacheNotifyInfo*>& o);
125};
126WRITE_CLASS_ENCODER(RGWCacheNotifyInfo)
127
128struct ObjectCacheEntry {
129 ObjectCacheInfo info;
130 std::list<string>::iterator lru_iter;
131 uint64_t lru_promotion_ts;
132 uint64_t gen;
133 std::list<pair<RGWChainedCache *, string> > chained_entries;
134
135 ObjectCacheEntry() : lru_promotion_ts(0), gen(0) {}
136};
137
138class ObjectCache {
139 std::map<string, ObjectCacheEntry> cache_map;
140 std::list<string> lru;
141 unsigned long lru_size;
142 unsigned long lru_counter;
143 unsigned long lru_window;
144 RWLock lock;
145 CephContext *cct;
146
147 list<RGWChainedCache *> chained_cache;
148
149 bool enabled;
b32b8144 150 ceph::timespan expiry;
7c673cae
FG
151
152 void touch_lru(string& name, ObjectCacheEntry& entry, std::list<string>::iterator& lru_iter);
153 void remove_lru(string& name, std::list<string>::iterator& lru_iter);
b32b8144 154 void invalidate_lru(ObjectCacheEntry& entry);
7c673cae
FG
155
156 void do_invalidate_all();
157public:
158 ObjectCache() : lru_size(0), lru_counter(0), lru_window(0), lock("ObjectCache"), cct(NULL), enabled(false) { }
159 int get(std::string& name, ObjectCacheInfo& bl, uint32_t mask, rgw_cache_entry_info *cache_info);
160 void put(std::string& name, ObjectCacheInfo& bl, rgw_cache_entry_info *cache_info);
161 void remove(std::string& name);
162 void set_ctx(CephContext *_cct) {
163 cct = _cct;
164 lru_window = cct->_conf->rgw_cache_lru_size / 2;
b32b8144
FG
165 expiry = std::chrono::seconds(cct->_conf->get_val<uint64_t>(
166 "rgw_cache_expiry_interval"));
7c673cae
FG
167 }
168 bool chain_cache_entry(list<rgw_cache_entry_info *>& cache_info_entries, RGWChainedCache::Entry *chained_entry);
169
170 void set_enabled(bool status);
171
172 void chain_cache(RGWChainedCache *cache);
173 void invalidate_all();
174};
175
176template <class T>
177class RGWCache : public T
178{
179 ObjectCache cache;
180
181 int list_objects_raw_init(rgw_pool& pool, RGWAccessHandle *handle) {
182 return T::list_objects_raw_init(pool, handle);
183 }
184 int list_objects_raw_next(rgw_bucket_dir_entry& obj, RGWAccessHandle *handle) {
185 return T::list_objects_raw_next(obj, handle);
186 }
187
188 string normal_name(rgw_pool& pool, const std::string& oid) {
189 std::string buf;
190 buf.reserve(pool.name.size() + pool.ns.size() + oid.size() + 2);
191 buf.append(pool.name).append("+").append(pool.ns).append("+").append(oid);
192 return buf;
193 }
194
195 void normalize_pool_and_obj(rgw_pool& src_pool, const string& src_obj, rgw_pool& dst_pool, string& dst_obj);
196 string normal_name(rgw_raw_obj& obj) {
197 return normal_name(obj.pool, obj.oid);
198 }
199
200 int init_rados() override {
201 int ret;
202 cache.set_ctx(T::cct);
203 ret = T::init_rados();
204 if (ret < 0)
205 return ret;
206
207 return 0;
208 }
209
210 bool need_watch_notify() override {
211 return true;
212 }
213
214 int distribute_cache(const string& normal_name, rgw_raw_obj& obj, ObjectCacheInfo& obj_info, int op);
215 int watch_cb(uint64_t notify_id,
216 uint64_t cookie,
217 uint64_t notifier_id,
218 bufferlist& bl) override;
219
220 void set_cache_enabled(bool state) override {
221 cache.set_enabled(state);
222 }
223public:
224 RGWCache() {}
225
226 void register_chained_cache(RGWChainedCache *cc) override {
227 cache.chain_cache(cc);
228 }
229
230 int system_obj_set_attrs(void *ctx, rgw_raw_obj& obj,
231 map<string, bufferlist>& attrs,
232 map<string, bufferlist>* rmattrs,
233 RGWObjVersionTracker *objv_tracker);
234 int put_system_obj_impl(rgw_raw_obj& obj, uint64_t size, real_time *mtime,
235 map<std::string, bufferlist>& attrs, int flags,
236 bufferlist& data,
237 RGWObjVersionTracker *objv_tracker,
238 real_time set_mtime) override;
239 int put_system_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& bl, off_t ofs, bool exclusive,
240 RGWObjVersionTracker *objv_tracker = nullptr) override;
241
242 int get_system_obj(RGWObjectCtx& obj_ctx, RGWRados::SystemObject::Read::GetObjState& read_state,
243 RGWObjVersionTracker *objv_tracker, rgw_raw_obj& obj,
244 bufferlist& bl, off_t ofs, off_t end,
245 map<string, bufferlist> *attrs,
b32b8144
FG
246 rgw_cache_entry_info *cache_info,
247 boost::optional<obj_version> refresh_version = boost::none) override;
7c673cae
FG
248
249 int raw_obj_stat(rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime, uint64_t *epoch, map<string, bufferlist> *attrs,
250 bufferlist *first_chunk, RGWObjVersionTracker *objv_tracker) override;
251
252 int delete_system_obj(rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker) override;
253
254 bool chain_cache_entry(list<rgw_cache_entry_info *>& cache_info_entries, RGWChainedCache::Entry *chained_entry) override {
255 return cache.chain_cache_entry(cache_info_entries, chained_entry);
256 }
257};
258
259template <class T>
260void RGWCache<T>::normalize_pool_and_obj(rgw_pool& src_pool, const string& src_obj, rgw_pool& dst_pool, string& dst_obj)
261{
262 if (src_obj.size()) {
263 dst_pool = src_pool;
264 dst_obj = src_obj;
265 } else {
266 dst_pool = T::get_zone_params().domain_root;
267 dst_obj = src_pool.name;
268 }
269}
270
271template <class T>
272int RGWCache<T>::delete_system_obj(rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker)
273{
274 rgw_pool pool;
275 string oid;
276 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
277
278 string name = normal_name(obj);
279 cache.remove(name);
280
281 ObjectCacheInfo info;
282 distribute_cache(name, obj, info, REMOVE_OBJ);
283
284 return T::delete_system_obj(obj, objv_tracker);
285}
286
287template <class T>
288int RGWCache<T>::get_system_obj(RGWObjectCtx& obj_ctx, RGWRados::SystemObject::Read::GetObjState& read_state,
289 RGWObjVersionTracker *objv_tracker, rgw_raw_obj& obj,
290 bufferlist& obl, off_t ofs, off_t end,
291 map<string, bufferlist> *attrs,
b32b8144
FG
292 rgw_cache_entry_info *cache_info,
293 boost::optional<obj_version> refresh_version)
7c673cae
FG
294{
295 rgw_pool pool;
296 string oid;
297 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
298 if (ofs != 0)
299 return T::get_system_obj(obj_ctx, read_state, objv_tracker, obj, obl, ofs, end, attrs, cache_info);
300
301 string name = normal_name(obj.pool, oid);
302
303 ObjectCacheInfo info;
304
305 uint32_t flags = CACHE_FLAG_DATA;
306 if (objv_tracker)
307 flags |= CACHE_FLAG_OBJV;
308 if (attrs)
309 flags |= CACHE_FLAG_XATTRS;
b32b8144
FG
310
311 if ((cache.get(name, info, flags, cache_info) == 0) &&
312 (!refresh_version || !info.version.compare(&(*refresh_version)))) {
7c673cae
FG
313 if (info.status < 0)
314 return info.status;
315
316 bufferlist& bl = info.data;
317
318 bufferlist::iterator i = bl.begin();
319
320 obl.clear();
321
322 i.copy_all(obl);
323 if (objv_tracker)
324 objv_tracker->read_version = info.version;
325 if (attrs)
326 *attrs = info.xattrs;
327 return bl.length();
328 }
329 int r = T::get_system_obj(obj_ctx, read_state, objv_tracker, obj, obl, ofs, end, attrs, cache_info);
330 if (r < 0) {
331 if (r == -ENOENT) { // only update ENOENT, we'd rather retry other errors
332 info.status = r;
333 cache.put(name, info, cache_info);
334 }
335 return r;
336 }
337
338 if (obl.length() == end + 1) {
339 /* in this case, most likely object contains more data, we can't cache it */
340 return r;
341 }
342
343 bufferptr p(r);
344 bufferlist& bl = info.data;
345 bl.clear();
346 bufferlist::iterator o = obl.begin();
347 o.copy_all(bl);
348 info.status = 0;
349 info.flags = flags;
350 if (objv_tracker) {
351 info.version = objv_tracker->read_version;
352 }
353 if (attrs) {
354 info.xattrs = *attrs;
355 }
356 cache.put(name, info, cache_info);
357 return r;
358}
359
360template <class T>
361int RGWCache<T>::system_obj_set_attrs(void *ctx, rgw_raw_obj& obj,
362 map<string, bufferlist>& attrs,
363 map<string, bufferlist>* rmattrs,
364 RGWObjVersionTracker *objv_tracker)
365{
366 rgw_pool pool;
367 string oid;
368 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
369 ObjectCacheInfo info;
370 info.xattrs = attrs;
371 if (rmattrs)
372 info.rm_xattrs = *rmattrs;
373 info.status = 0;
374 info.flags = CACHE_FLAG_MODIFY_XATTRS;
375 if (objv_tracker) {
376 info.version = objv_tracker->write_version;
377 info.flags |= CACHE_FLAG_OBJV;
378 }
379 int ret = T::system_obj_set_attrs(ctx, obj, attrs, rmattrs, objv_tracker);
380 string name = normal_name(pool, oid);
381 if (ret >= 0) {
382 cache.put(name, info, NULL);
383 int r = distribute_cache(name, obj, info, UPDATE_OBJ);
384 if (r < 0)
385 mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
386 } else {
387 cache.remove(name);
388 }
389
390 return ret;
391}
392
393template <class T>
394int RGWCache<T>::put_system_obj_impl(rgw_raw_obj& obj, uint64_t size, real_time *mtime,
395 map<std::string, bufferlist>& attrs, int flags,
396 bufferlist& data,
397 RGWObjVersionTracker *objv_tracker,
398 real_time set_mtime)
399{
400 rgw_pool pool;
401 string oid;
402 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
403 ObjectCacheInfo info;
404 info.xattrs = attrs;
405 info.status = 0;
406 info.data = data;
407 info.flags = CACHE_FLAG_XATTRS | CACHE_FLAG_DATA | CACHE_FLAG_META;
408 if (objv_tracker) {
409 info.version = objv_tracker->write_version;
410 info.flags |= CACHE_FLAG_OBJV;
411 }
412 ceph::real_time result_mtime;
413 int ret = T::put_system_obj_impl(obj, size, &result_mtime, attrs, flags, data,
414 objv_tracker, set_mtime);
415 if (mtime) {
416 *mtime = result_mtime;
417 }
418 info.meta.mtime = result_mtime;
419 info.meta.size = size;
420 string name = normal_name(pool, oid);
421 if (ret >= 0) {
422 cache.put(name, info, NULL);
b32b8144
FG
423 // Only distribute the cache information if we did not just create
424 // the object with the exclusive flag. Note: PUT_OBJ_EXCL implies
425 // PUT_OBJ_CREATE. Generally speaking, when successfully creating
426 // a system object with the exclusive flag it is not necessary to
427 // call distribute_cache, as a) it's unclear whether other RGWs
428 // will need that system object in the near-term and b) it
429 // generates additional network traffic.
430 if (!(flags & PUT_OBJ_EXCL)) {
431 int r = distribute_cache(name, obj, info, UPDATE_OBJ);
432 if (r < 0)
433 mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
434 }
7c673cae
FG
435 } else {
436 cache.remove(name);
437 }
438
439 return ret;
440}
441
442template <class T>
443int RGWCache<T>::put_system_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& data, off_t ofs, bool exclusive,
444 RGWObjVersionTracker *objv_tracker)
445{
446 rgw_pool pool;
447 string oid;
448 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
449 ObjectCacheInfo info;
450 bool cacheable = false;
451 if ((ofs == 0) || (ofs == -1)) {
452 cacheable = true;
453 info.data = data;
454 info.meta.size = data.length();
455 info.status = 0;
456 info.flags = CACHE_FLAG_DATA;
457 }
458 if (objv_tracker) {
459 info.version = objv_tracker->write_version;
460 info.flags |= CACHE_FLAG_OBJV;
461 }
462 int ret = T::put_system_obj_data(ctx, obj, data, ofs, exclusive, objv_tracker);
463 if (cacheable) {
464 string name = normal_name(pool, oid);
465 if (ret >= 0) {
466 cache.put(name, info, NULL);
467 int r = distribute_cache(name, obj, info, UPDATE_OBJ);
468 if (r < 0)
469 mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
470 } else {
471 cache.remove(name);
472 }
473 }
474
475 return ret;
476}
477
478template <class T>
479int RGWCache<T>::raw_obj_stat(rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime,
480 uint64_t *pepoch, map<string, bufferlist> *attrs,
481 bufferlist *first_chunk, RGWObjVersionTracker *objv_tracker)
482{
483 rgw_pool pool;
484 string oid;
485 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
486
487 string name = normal_name(pool, oid);
488
489 uint64_t size;
490 real_time mtime;
491 uint64_t epoch;
492
493 ObjectCacheInfo info;
494 uint32_t flags = CACHE_FLAG_META | CACHE_FLAG_XATTRS;
495 if (objv_tracker)
496 flags |= CACHE_FLAG_OBJV;
497 int r = cache.get(name, info, flags, NULL);
498 if (r == 0) {
499 if (info.status < 0)
500 return info.status;
501
502 size = info.meta.size;
503 mtime = info.meta.mtime;
504 epoch = info.epoch;
505 if (objv_tracker)
506 objv_tracker->read_version = info.version;
507 goto done;
508 }
509 r = T::raw_obj_stat(obj, &size, &mtime, &epoch, &info.xattrs, first_chunk, objv_tracker);
510 if (r < 0) {
511 if (r == -ENOENT) {
512 info.status = r;
513 cache.put(name, info, NULL);
514 }
515 return r;
516 }
517 info.status = 0;
518 info.epoch = epoch;
519 info.meta.mtime = mtime;
520 info.meta.size = size;
521 info.flags = CACHE_FLAG_META | CACHE_FLAG_XATTRS;
522 if (objv_tracker) {
523 info.flags |= CACHE_FLAG_OBJV;
524 info.version = objv_tracker->read_version;
525 }
526 cache.put(name, info, NULL);
527done:
528 if (psize)
529 *psize = size;
530 if (pmtime)
531 *pmtime = mtime;
532 if (pepoch)
533 *pepoch = epoch;
534 if (attrs)
535 *attrs = info.xattrs;
536 return 0;
537}
538
539template <class T>
540int RGWCache<T>::distribute_cache(const string& normal_name, rgw_raw_obj& obj, ObjectCacheInfo& obj_info, int op)
541{
542 RGWCacheNotifyInfo info;
543
544 info.op = op;
545
546 info.obj_info = obj_info;
547 info.obj = obj;
548 bufferlist bl;
549 ::encode(info, bl);
550 return T::distribute(normal_name, bl);
551}
552
553template <class T>
554int RGWCache<T>::watch_cb(uint64_t notify_id,
555 uint64_t cookie,
556 uint64_t notifier_id,
557 bufferlist& bl)
558{
559 RGWCacheNotifyInfo info;
560
561 try {
562 bufferlist::iterator iter = bl.begin();
563 ::decode(info, iter);
564 } catch (buffer::end_of_buffer& err) {
565 mydout(0) << "ERROR: got bad notification" << dendl;
566 return -EIO;
567 } catch (buffer::error& err) {
568 mydout(0) << "ERROR: buffer::error" << dendl;
569 return -EIO;
570 }
571
572 rgw_pool pool;
573 string oid;
574 normalize_pool_and_obj(info.obj.pool, info.obj.oid, pool, oid);
575 string name = normal_name(pool, oid);
576
577 switch (info.op) {
578 case UPDATE_OBJ:
579 cache.put(name, info.obj_info, NULL);
580 break;
581 case REMOVE_OBJ:
582 cache.remove(name);
583 break;
584 default:
585 mydout(0) << "WARNING: got unknown notification op: " << info.op << dendl;
586 return -EINVAL;
587 }
588
589 return 0;
590}
591
592#endif