]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/services/svc_sys_obj_cache.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rgw / services / svc_sys_obj_cache.cc
CommitLineData
11fdf7f2
TL
1#include "svc_sys_obj_cache.h"
2#include "svc_zone.h"
3#include "svc_notify.h"
4
5#include "rgw/rgw_zone.h"
6#include "rgw/rgw_tools.h"
7
8#define dout_subsys ceph_subsys_rgw
9
10class RGWSI_SysObj_Cache_CB : public RGWSI_Notify::CB
11{
12 RGWSI_SysObj_Cache *svc;
13public:
14 RGWSI_SysObj_Cache_CB(RGWSI_SysObj_Cache *_svc) : svc(_svc) {}
15 int watch_cb(uint64_t notify_id,
16 uint64_t cookie,
17 uint64_t notifier_id,
18 bufferlist& bl) {
19 return svc->watch_cb(notify_id, cookie, notifier_id, bl);
20 }
21
22 void set_enabled(bool status) {
23 svc->set_enabled(status);
24 }
25};
26
27int RGWSI_SysObj_Cache::do_start()
28{
29 int r = RGWSI_SysObj_Core::do_start();
30 if (r < 0) {
31 return r;
32 }
33
34 r = notify_svc->start();
35 if (r < 0) {
36 return r;
37 }
38
39 assert(notify_svc->is_started());
40
41 cb.reset(new RGWSI_SysObj_Cache_CB(this));
42
43 notify_svc->register_watch_cb(cb.get());
44
45 return 0;
46}
47
48static string normal_name(rgw_pool& pool, const std::string& oid) {
49 std::string buf;
50 buf.reserve(pool.name.size() + pool.ns.size() + oid.size() + 2);
51 buf.append(pool.name).append("+").append(pool.ns).append("+").append(oid);
52 return buf;
53}
54
55void RGWSI_SysObj_Cache::normalize_pool_and_obj(const rgw_pool& src_pool, const string& src_obj, rgw_pool& dst_pool, string& dst_obj)
56{
57 if (src_obj.size()) {
58 dst_pool = src_pool;
59 dst_obj = src_obj;
60 } else {
61 dst_pool = zone_svc->get_zone_params().domain_root;
62 dst_obj = src_pool.name;
63 }
64}
65
66
67int RGWSI_SysObj_Cache::remove(RGWSysObjectCtxBase& obj_ctx,
68 RGWObjVersionTracker *objv_tracker,
69 const rgw_raw_obj& obj)
70
71{
72 rgw_pool pool;
73 string oid;
74 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
75
76 string name = normal_name(pool, oid);
77 cache.remove(name);
78
79 ObjectCacheInfo info;
80 int r = distribute_cache(name, obj, info, REMOVE_OBJ);
81 if (r < 0) {
82 ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to distribute cache: r=" << r << dendl;
83 }
84
85 return RGWSI_SysObj_Core::remove(obj_ctx, objv_tracker, obj);
86}
87
88int RGWSI_SysObj_Cache::read(RGWSysObjectCtxBase& obj_ctx,
89 GetObjState& read_state,
90 RGWObjVersionTracker *objv_tracker,
91 const rgw_raw_obj& obj,
92 bufferlist *obl, off_t ofs, off_t end,
93 map<string, bufferlist> *attrs,
94 bool raw_attrs,
95 rgw_cache_entry_info *cache_info,
96 boost::optional<obj_version> refresh_version)
97{
98 rgw_pool pool;
99 string oid;
100 if (ofs != 0) {
101 return RGWSI_SysObj_Core::read(obj_ctx, read_state, objv_tracker,
102 obj, obl, ofs, end, attrs, raw_attrs,
103 cache_info, refresh_version);
104 }
105
106 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
107 string name = normal_name(pool, oid);
108
109 ObjectCacheInfo info;
110
111 uint32_t flags = (end != 0 ? CACHE_FLAG_DATA : 0);
112 if (objv_tracker)
113 flags |= CACHE_FLAG_OBJV;
114 if (attrs)
115 flags |= CACHE_FLAG_XATTRS;
116
117 if ((cache.get(name, info, flags, cache_info) == 0) &&
118 (!refresh_version || !info.version.compare(&(*refresh_version)))) {
119 if (info.status < 0)
120 return info.status;
121
122 bufferlist& bl = info.data;
123
124 bufferlist::iterator i = bl.begin();
125
126 obl->clear();
127
128 i.copy_all(*obl);
129 if (objv_tracker)
130 objv_tracker->read_version = info.version;
131 if (attrs) {
132 if (raw_attrs) {
133 *attrs = info.xattrs;
134 } else {
135 rgw_filter_attrset(info.xattrs, RGW_ATTR_PREFIX, attrs);
136 }
137 }
138 return obl->length();
139 }
140
141 map<string, bufferlist> unfiltered_attrset;
142 int r = RGWSI_SysObj_Core::read(obj_ctx, read_state, objv_tracker,
143 obj, obl, ofs, end,
144 (attrs ? &unfiltered_attrset : nullptr),
145 true, /* cache unfiltered attrs */
146 cache_info,
147 refresh_version);
148 if (r < 0) {
149 if (r == -ENOENT) { // only update ENOENT, we'd rather retry other errors
150 info.status = r;
151 cache.put(name, info, cache_info);
152 }
153 return r;
154 }
155
156 if (obl->length() == end + 1) {
157 /* in this case, most likely object contains more data, we can't cache it */
158 flags &= ~CACHE_FLAG_DATA;
159 } else {
160 bufferptr p(r);
161 bufferlist& bl = info.data;
162 bl.clear();
163 bufferlist::iterator o = obl->begin();
164 o.copy_all(bl);
165 }
166
167 info.status = 0;
168 info.flags = flags;
169 if (objv_tracker) {
170 info.version = objv_tracker->read_version;
171 }
172 if (attrs) {
173 info.xattrs = std::move(unfiltered_attrset);
174 if (raw_attrs) {
175 *attrs = info.xattrs;
176 } else {
177 rgw_filter_attrset(info.xattrs, RGW_ATTR_PREFIX, attrs);
178 }
179 }
180 cache.put(name, info, cache_info);
181 return r;
182}
183
184int RGWSI_SysObj_Cache::get_attr(const rgw_raw_obj& obj,
185 const char *attr_name,
186 bufferlist *dest)
187{
188 rgw_pool pool;
189 string oid;
190
191 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
192 string name = normal_name(pool, oid);
193
194 ObjectCacheInfo info;
195
196 uint32_t flags = CACHE_FLAG_XATTRS;
197
198 if (cache.get(name, info, flags, nullptr) == 0) {
199 if (info.status < 0)
200 return info.status;
201
202 auto iter = info.xattrs.find(attr_name);
203 if (iter == info.xattrs.end()) {
204 return -ENODATA;
205 }
206
207 *dest = iter->second;
208 return dest->length();
209 }
210 /* don't try to cache this one */
211 return RGWSI_SysObj_Core::get_attr(obj, attr_name, dest);
212}
213
214int RGWSI_SysObj_Cache::set_attrs(const rgw_raw_obj& obj,
215 map<string, bufferlist>& attrs,
216 map<string, bufferlist> *rmattrs,
217 RGWObjVersionTracker *objv_tracker)
218{
219 rgw_pool pool;
220 string oid;
221 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
222 ObjectCacheInfo info;
223 info.xattrs = attrs;
224 if (rmattrs) {
225 info.rm_xattrs = *rmattrs;
226 }
227 info.status = 0;
228 info.flags = CACHE_FLAG_MODIFY_XATTRS;
229 if (objv_tracker) {
230 info.version = objv_tracker->write_version;
231 info.flags |= CACHE_FLAG_OBJV;
232 }
233 int ret = RGWSI_SysObj_Core::set_attrs(obj, attrs, rmattrs, objv_tracker);
234 string name = normal_name(pool, oid);
235 if (ret >= 0) {
236 cache.put(name, info, NULL);
237 int r = distribute_cache(name, obj, info, UPDATE_OBJ);
238 if (r < 0)
239 ldout(cct, 0) << "ERROR: failed to distribute cache for " << obj << dendl;
240 } else {
241 cache.remove(name);
242 }
243
244 return ret;
245}
246
247int RGWSI_SysObj_Cache::write(const rgw_raw_obj& obj,
248 real_time *pmtime,
249 map<std::string, bufferlist>& attrs,
250 bool exclusive,
251 const bufferlist& data,
252 RGWObjVersionTracker *objv_tracker,
253 real_time set_mtime)
254{
255 rgw_pool pool;
256 string oid;
257 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
258 ObjectCacheInfo info;
259 info.xattrs = attrs;
260 info.status = 0;
261 info.data = data;
262 info.flags = CACHE_FLAG_XATTRS | CACHE_FLAG_DATA | CACHE_FLAG_META;
263 if (objv_tracker) {
264 info.version = objv_tracker->write_version;
265 info.flags |= CACHE_FLAG_OBJV;
266 }
267 ceph::real_time result_mtime;
268 int ret = RGWSI_SysObj_Core::write(obj, &result_mtime, attrs,
269 exclusive, data,
270 objv_tracker, set_mtime);
271 if (pmtime) {
272 *pmtime = result_mtime;
273 }
274 info.meta.mtime = result_mtime;
275 info.meta.size = data.length();
276 string name = normal_name(pool, oid);
277 if (ret >= 0) {
278 cache.put(name, info, NULL);
279 // Only distribute the cache information if we did not just create
280 // the object with the exclusive flag. Note: PUT_OBJ_EXCL implies
281 // PUT_OBJ_CREATE. Generally speaking, when successfully creating
282 // a system object with the exclusive flag it is not necessary to
283 // call distribute_cache, as a) it's unclear whether other RGWs
284 // will need that system object in the near-term and b) it
285 // generates additional network traffic.
286 if (!exclusive) {
287 int r = distribute_cache(name, obj, info, UPDATE_OBJ);
288 if (r < 0)
289 ldout(cct, 0) << "ERROR: failed to distribute cache for " << obj << dendl;
290 }
291 } else {
292 cache.remove(name);
293 }
294
295 return ret;
296}
297
298int RGWSI_SysObj_Cache::write_data(const rgw_raw_obj& obj,
299 const bufferlist& data,
300 bool exclusive,
301 RGWObjVersionTracker *objv_tracker)
302{
303 rgw_pool pool;
304 string oid;
305 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
306
307 ObjectCacheInfo info;
308 info.data = data;
309 info.meta.size = data.length();
310 info.status = 0;
311 info.flags = CACHE_FLAG_DATA;
312
313 if (objv_tracker) {
314 info.version = objv_tracker->write_version;
315 info.flags |= CACHE_FLAG_OBJV;
316 }
317 int ret = RGWSI_SysObj_Core::write_data(obj, data, exclusive, objv_tracker);
318 string name = normal_name(pool, oid);
319 if (ret >= 0) {
320 cache.put(name, info, NULL);
321 int r = distribute_cache(name, obj, info, UPDATE_OBJ);
322 if (r < 0)
323 ldout(cct, 0) << "ERROR: failed to distribute cache for " << obj << dendl;
324 } else {
325 cache.remove(name);
326 }
327
328 return ret;
329}
330
331int RGWSI_SysObj_Cache::raw_stat(const rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime, uint64_t *pepoch,
332 map<string, bufferlist> *attrs, bufferlist *first_chunk,
333 RGWObjVersionTracker *objv_tracker)
334{
335 rgw_pool pool;
336 string oid;
337 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
338
339 string name = normal_name(pool, oid);
340
341 uint64_t size;
342 real_time mtime;
343 uint64_t epoch;
344
345 ObjectCacheInfo info;
346 uint32_t flags = CACHE_FLAG_META | CACHE_FLAG_XATTRS;
347 if (objv_tracker)
348 flags |= CACHE_FLAG_OBJV;
349 int r = cache.get(name, info, flags, NULL);
350 if (r == 0) {
351 if (info.status < 0)
352 return info.status;
353
354 size = info.meta.size;
355 mtime = info.meta.mtime;
356 epoch = info.epoch;
357 if (objv_tracker)
358 objv_tracker->read_version = info.version;
359 goto done;
360 }
361 r = RGWSI_SysObj_Core::raw_stat(obj, &size, &mtime, &epoch, &info.xattrs, first_chunk, objv_tracker);
362 if (r < 0) {
363 if (r == -ENOENT) {
364 info.status = r;
365 cache.put(name, info, NULL);
366 }
367 return r;
368 }
369 info.status = 0;
370 info.epoch = epoch;
371 info.meta.mtime = mtime;
372 info.meta.size = size;
373 info.flags = CACHE_FLAG_META | CACHE_FLAG_XATTRS;
374 if (objv_tracker) {
375 info.flags |= CACHE_FLAG_OBJV;
376 info.version = objv_tracker->read_version;
377 }
378 cache.put(name, info, NULL);
379done:
380 if (psize)
381 *psize = size;
382 if (pmtime)
383 *pmtime = mtime;
384 if (pepoch)
385 *pepoch = epoch;
386 if (attrs)
387 *attrs = info.xattrs;
388 return 0;
389}
390
391int RGWSI_SysObj_Cache::distribute_cache(const string& normal_name, const rgw_raw_obj& obj, ObjectCacheInfo& obj_info, int op)
392{
393 RGWCacheNotifyInfo info;
394
395 info.op = op;
396
397 info.obj_info = obj_info;
398 info.obj = obj;
399 bufferlist bl;
400 encode(info, bl);
401 return notify_svc->distribute(normal_name, bl);
402}
403
404int RGWSI_SysObj_Cache::watch_cb(uint64_t notify_id,
405 uint64_t cookie,
406 uint64_t notifier_id,
407 bufferlist& bl)
408{
409 RGWCacheNotifyInfo info;
410
411 try {
412 auto iter = bl.cbegin();
413 decode(info, iter);
414 } catch (buffer::end_of_buffer& err) {
415 ldout(cct, 0) << "ERROR: got bad notification" << dendl;
416 return -EIO;
417 } catch (buffer::error& err) {
418 ldout(cct, 0) << "ERROR: buffer::error" << dendl;
419 return -EIO;
420 }
421
422 rgw_pool pool;
423 string oid;
424 normalize_pool_and_obj(info.obj.pool, info.obj.oid, pool, oid);
425 string name = normal_name(pool, oid);
426
427 switch (info.op) {
428 case UPDATE_OBJ:
429 cache.put(name, info.obj_info, NULL);
430 break;
431 case REMOVE_OBJ:
432 cache.remove(name);
433 break;
434 default:
435 ldout(cct, 0) << "WARNING: got unknown notification op: " << info.op << dendl;
436 return -EINVAL;
437 }
438
439 return 0;
440}
441
442void RGWSI_SysObj_Cache::set_enabled(bool status)
443{
444 cache.set_enabled(status);
445}
446
447bool RGWSI_SysObj_Cache::chain_cache_entry(std::initializer_list<rgw_cache_entry_info *> cache_info_entries,
448 RGWChainedCache::Entry *chained_entry)
449{
450 return cache.chain_cache_entry(cache_info_entries, chained_entry);
451}
452
453void RGWSI_SysObj_Cache::register_chained_cache(RGWChainedCache *cc)
454{
455 cache.chain_cache(cc);
456}
457
458void RGWSI_SysObj_Cache::unregister_chained_cache(RGWChainedCache *cc)
459{
460 cache.unchain_cache(cc);
461}
462
463static void cache_list_dump_helper(Formatter* f,
464 const std::string& name,
465 const ceph::real_time mtime,
466 const std::uint64_t size)
467{
468 f->dump_string("name", name);
469 f->dump_string("mtime", ceph::to_iso_8601(mtime));
470 f->dump_unsigned("size", size);
471}
472
473void RGWSI_SysObj_Cache::call_list(const std::optional<std::string>& filter, Formatter* f)
474{
475 cache.for_each(
476 [this, &filter, f] (const string& name, const ObjectCacheEntry& entry) {
477 if (!filter || name.find(*filter) != name.npos) {
478 cache_list_dump_helper(f, name, entry.info.meta.mtime,
479 entry.info.meta.size);
480 }
481 });
482}
483
484int RGWSI_SysObj_Cache::call_inspect(const std::string& target, Formatter* f)
485{
486 if (const auto entry = cache.get(target)) {
487 f->open_object_section("cache_entry");
488 f->dump_string("name", target.c_str());
489 entry->dump(f);
490 f->close_section();
491 return true;
492 } else {
493 return false;
494 }
495}
496
497int RGWSI_SysObj_Cache::call_erase(const std::string& target)
498{
499 return cache.remove(target);
500}
501
502int RGWSI_SysObj_Cache::call_zap()
503{
504 cache.invalidate_all();
505 return 0;
506}