]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/services/svc_sys_obj_cache.cc
3453f43e16ad1b0c0e0be5f9d59cad63ead29012
[ceph.git] / ceph / src / rgw / services / svc_sys_obj_cache.cc
1
2 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3 // vim: ts=8 sw=2 smarttab ft=cpp
4
5 #include "common/admin_socket.h"
6
7 #include "svc_sys_obj_cache.h"
8 #include "svc_zone.h"
9 #include "svc_notify.h"
10
11 #include "rgw/rgw_zone.h"
12 #include "rgw/rgw_tools.h"
13
14 #define dout_subsys ceph_subsys_rgw
15
16 using namespace std;
17
18 class RGWSI_SysObj_Cache_CB : public RGWSI_Notify::CB
19 {
20 RGWSI_SysObj_Cache *svc;
21 public:
22 RGWSI_SysObj_Cache_CB(RGWSI_SysObj_Cache *_svc) : svc(_svc) {}
23 int watch_cb(const DoutPrefixProvider *dpp,
24 uint64_t notify_id,
25 uint64_t cookie,
26 uint64_t notifier_id,
27 bufferlist& bl) {
28 return svc->watch_cb(dpp, notify_id, cookie, notifier_id, bl);
29 }
30
31 void set_enabled(bool status) {
32 svc->set_enabled(status);
33 }
34 };
35
36 int RGWSI_SysObj_Cache::do_start(optional_yield y, const DoutPrefixProvider *dpp)
37 {
38 int r = asocket.start();
39 if (r < 0) {
40 return r;
41 }
42
43 r = RGWSI_SysObj_Core::do_start(y, dpp);
44 if (r < 0) {
45 return r;
46 }
47
48 r = notify_svc->start(y, dpp);
49 if (r < 0) {
50 return r;
51 }
52
53 assert(notify_svc->is_started());
54
55 cb.reset(new RGWSI_SysObj_Cache_CB(this));
56
57 notify_svc->register_watch_cb(cb.get());
58
59 return 0;
60 }
61
62 void RGWSI_SysObj_Cache::shutdown()
63 {
64 asocket.shutdown();
65 RGWSI_SysObj_Core::shutdown();
66 }
67
68 static string normal_name(rgw_pool& pool, const std::string& oid) {
69 std::string buf;
70 buf.reserve(pool.name.size() + pool.ns.size() + oid.size() + 2);
71 buf.append(pool.name).append("+").append(pool.ns).append("+").append(oid);
72 return buf;
73 }
74
75 void RGWSI_SysObj_Cache::normalize_pool_and_obj(const rgw_pool& src_pool, const string& src_obj, rgw_pool& dst_pool, string& dst_obj)
76 {
77 if (src_obj.size()) {
78 dst_pool = src_pool;
79 dst_obj = src_obj;
80 } else {
81 dst_pool = zone_svc->get_zone_params().domain_root;
82 dst_obj = src_pool.name;
83 }
84 }
85
86
87 int RGWSI_SysObj_Cache::remove(const DoutPrefixProvider *dpp,
88 RGWSysObjectCtxBase& obj_ctx,
89 RGWObjVersionTracker *objv_tracker,
90 const rgw_raw_obj& obj,
91 optional_yield y)
92
93 {
94 rgw_pool pool;
95 string oid;
96 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
97
98 string name = normal_name(pool, oid);
99 cache.invalidate_remove(dpp, name);
100
101 ObjectCacheInfo info;
102 int r = distribute_cache(dpp, name, obj, info, INVALIDATE_OBJ, y);
103 if (r < 0) {
104 ldpp_dout(dpp, 0) << "ERROR: " << __func__ << "(): failed to distribute cache: r=" << r << dendl;
105 }
106
107 return RGWSI_SysObj_Core::remove(dpp, obj_ctx, objv_tracker, obj, y);
108 }
109
110 int RGWSI_SysObj_Cache::read(const DoutPrefixProvider *dpp,
111 RGWSysObjectCtxBase& obj_ctx,
112 RGWSI_SysObj_Obj_GetObjState& read_state,
113 RGWObjVersionTracker *objv_tracker,
114 const rgw_raw_obj& obj,
115 bufferlist *obl, off_t ofs, off_t end,
116 map<string, bufferlist> *attrs,
117 bool raw_attrs,
118 rgw_cache_entry_info *cache_info,
119 boost::optional<obj_version> refresh_version,
120 optional_yield y)
121 {
122 rgw_pool pool;
123 string oid;
124 if (ofs != 0) {
125 return RGWSI_SysObj_Core::read(dpp, obj_ctx, read_state, objv_tracker,
126 obj, obl, ofs, end, attrs, raw_attrs,
127 cache_info, refresh_version, y);
128 }
129
130 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
131 string name = normal_name(pool, oid);
132
133 ObjectCacheInfo info;
134
135 uint32_t flags = (end != 0 ? CACHE_FLAG_DATA : 0);
136 if (objv_tracker)
137 flags |= CACHE_FLAG_OBJV;
138 if (attrs)
139 flags |= CACHE_FLAG_XATTRS;
140
141 int r = cache.get(dpp, name, info, flags, cache_info);
142 if (r == 0 &&
143 (!refresh_version || !info.version.compare(&(*refresh_version)))) {
144 if (info.status < 0)
145 return info.status;
146
147 bufferlist& bl = info.data;
148
149 bufferlist::iterator i = bl.begin();
150
151 obl->clear();
152
153 i.copy_all(*obl);
154 if (objv_tracker)
155 objv_tracker->read_version = info.version;
156 if (attrs) {
157 if (raw_attrs) {
158 *attrs = info.xattrs;
159 } else {
160 rgw_filter_attrset(info.xattrs, RGW_ATTR_PREFIX, attrs);
161 }
162 }
163 return obl->length();
164 }
165 if(r == -ENODATA)
166 return -ENOENT;
167
168 map<string, bufferlist> unfiltered_attrset;
169 r = RGWSI_SysObj_Core::read(dpp, obj_ctx, read_state, objv_tracker,
170 obj, obl, ofs, end,
171 (attrs ? &unfiltered_attrset : nullptr),
172 true, /* cache unfiltered attrs */
173 cache_info,
174 refresh_version, y);
175 if (r < 0) {
176 if (r == -ENOENT) { // only update ENOENT, we'd rather retry other errors
177 info.status = r;
178 cache.put(dpp, name, info, cache_info);
179 }
180 return r;
181 }
182
183 if (obl->length() == end + 1) {
184 /* in this case, most likely object contains more data, we can't cache it */
185 flags &= ~CACHE_FLAG_DATA;
186 } else {
187 bufferptr p(r);
188 bufferlist& bl = info.data;
189 bl.clear();
190 bufferlist::iterator o = obl->begin();
191 o.copy_all(bl);
192 }
193
194 info.status = 0;
195 info.flags = flags;
196 if (objv_tracker) {
197 info.version = objv_tracker->read_version;
198 }
199 if (attrs) {
200 info.xattrs = std::move(unfiltered_attrset);
201 if (raw_attrs) {
202 *attrs = info.xattrs;
203 } else {
204 rgw_filter_attrset(info.xattrs, RGW_ATTR_PREFIX, attrs);
205 }
206 }
207 cache.put(dpp, name, info, cache_info);
208 return r;
209 }
210
211 int RGWSI_SysObj_Cache::get_attr(const DoutPrefixProvider *dpp,
212 const rgw_raw_obj& obj,
213 const char *attr_name,
214 bufferlist *dest,
215 optional_yield y)
216 {
217 rgw_pool pool;
218 string oid;
219
220 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
221 string name = normal_name(pool, oid);
222
223 ObjectCacheInfo info;
224
225 uint32_t flags = CACHE_FLAG_XATTRS;
226
227 int r = cache.get(dpp, name, info, flags, nullptr);
228 if (r == 0) {
229 if (info.status < 0)
230 return info.status;
231
232 auto iter = info.xattrs.find(attr_name);
233 if (iter == info.xattrs.end()) {
234 return -ENODATA;
235 }
236
237 *dest = iter->second;
238 return dest->length();
239 } else if (r == -ENODATA) {
240 return -ENOENT;
241 }
242 /* don't try to cache this one */
243 return RGWSI_SysObj_Core::get_attr(dpp, obj, attr_name, dest, y);
244 }
245
246 int RGWSI_SysObj_Cache::set_attrs(const DoutPrefixProvider *dpp,
247 const rgw_raw_obj& obj,
248 map<string, bufferlist>& attrs,
249 map<string, bufferlist> *rmattrs,
250 RGWObjVersionTracker *objv_tracker,
251 optional_yield y)
252 {
253 rgw_pool pool;
254 string oid;
255 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
256 ObjectCacheInfo info;
257 info.xattrs = attrs;
258 if (rmattrs) {
259 info.rm_xattrs = *rmattrs;
260 }
261 info.status = 0;
262 info.flags = CACHE_FLAG_MODIFY_XATTRS;
263 int ret = RGWSI_SysObj_Core::set_attrs(dpp, obj, attrs, rmattrs, objv_tracker, y);
264 string name = normal_name(pool, oid);
265 if (ret >= 0) {
266 if (objv_tracker && objv_tracker->read_version.ver) {
267 info.version = objv_tracker->read_version;
268 info.flags |= CACHE_FLAG_OBJV;
269 }
270 cache.put(dpp, name, info, NULL);
271 int r = distribute_cache(dpp, name, obj, info, UPDATE_OBJ, y);
272 if (r < 0)
273 ldpp_dout(dpp, 0) << "ERROR: failed to distribute cache for " << obj << dendl;
274 } else {
275 cache.invalidate_remove(dpp, name);
276 }
277
278 return ret;
279 }
280
281 int RGWSI_SysObj_Cache::write(const DoutPrefixProvider *dpp,
282 const rgw_raw_obj& obj,
283 real_time *pmtime,
284 map<std::string, bufferlist>& attrs,
285 bool exclusive,
286 const bufferlist& data,
287 RGWObjVersionTracker *objv_tracker,
288 real_time set_mtime,
289 optional_yield y)
290 {
291 rgw_pool pool;
292 string oid;
293 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
294 ObjectCacheInfo info;
295 info.xattrs = attrs;
296 info.status = 0;
297 info.data = data;
298 info.flags = CACHE_FLAG_XATTRS | CACHE_FLAG_DATA | CACHE_FLAG_META;
299 ceph::real_time result_mtime;
300 int ret = RGWSI_SysObj_Core::write(dpp, obj, &result_mtime, attrs,
301 exclusive, data,
302 objv_tracker, set_mtime, y);
303 if (pmtime) {
304 *pmtime = result_mtime;
305 }
306 if (objv_tracker && objv_tracker->read_version.ver) {
307 info.version = objv_tracker->read_version;
308 info.flags |= CACHE_FLAG_OBJV;
309 }
310 info.meta.mtime = result_mtime;
311 info.meta.size = data.length();
312 string name = normal_name(pool, oid);
313 if (ret >= 0) {
314 cache.put(dpp, name, info, NULL);
315 int r = distribute_cache(dpp, name, obj, info, UPDATE_OBJ, y);
316 if (r < 0)
317 ldpp_dout(dpp, 0) << "ERROR: failed to distribute cache for " << obj << dendl;
318 } else {
319 cache.invalidate_remove(dpp, name);
320 }
321
322 return ret;
323 }
324
325 int RGWSI_SysObj_Cache::write_data(const DoutPrefixProvider *dpp,
326 const rgw_raw_obj& obj,
327 const bufferlist& data,
328 bool exclusive,
329 RGWObjVersionTracker *objv_tracker,
330 optional_yield y)
331 {
332 rgw_pool pool;
333 string oid;
334 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
335
336 ObjectCacheInfo info;
337 info.data = data;
338 info.meta.size = data.length();
339 info.status = 0;
340 info.flags = CACHE_FLAG_DATA;
341
342 int ret = RGWSI_SysObj_Core::write_data(dpp, obj, data, exclusive, objv_tracker, y);
343 string name = normal_name(pool, oid);
344 if (ret >= 0) {
345 if (objv_tracker && objv_tracker->read_version.ver) {
346 info.version = objv_tracker->read_version;
347 info.flags |= CACHE_FLAG_OBJV;
348 }
349 cache.put(dpp, name, info, NULL);
350 int r = distribute_cache(dpp, name, obj, info, UPDATE_OBJ, y);
351 if (r < 0)
352 ldpp_dout(dpp, 0) << "ERROR: failed to distribute cache for " << obj << dendl;
353 } else {
354 cache.invalidate_remove(dpp, name);
355 }
356
357 return ret;
358 }
359
360 int RGWSI_SysObj_Cache::raw_stat(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime, uint64_t *pepoch,
361 map<string, bufferlist> *attrs, bufferlist *first_chunk,
362 RGWObjVersionTracker *objv_tracker,
363 optional_yield y)
364 {
365 rgw_pool pool;
366 string oid;
367 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
368
369 string name = normal_name(pool, oid);
370
371 uint64_t size;
372 real_time mtime;
373 uint64_t epoch;
374
375 ObjectCacheInfo info;
376 uint32_t flags = CACHE_FLAG_META | CACHE_FLAG_XATTRS;
377 if (objv_tracker)
378 flags |= CACHE_FLAG_OBJV;
379 int r = cache.get(dpp, name, info, flags, NULL);
380 if (r == 0) {
381 if (info.status < 0)
382 return info.status;
383
384 size = info.meta.size;
385 mtime = info.meta.mtime;
386 epoch = info.epoch;
387 if (objv_tracker)
388 objv_tracker->read_version = info.version;
389 goto done;
390 }
391 if (r == -ENODATA) {
392 return -ENOENT;
393 }
394 r = RGWSI_SysObj_Core::raw_stat(dpp, obj, &size, &mtime, &epoch, &info.xattrs,
395 first_chunk, objv_tracker, y);
396 if (r < 0) {
397 if (r == -ENOENT) {
398 info.status = r;
399 cache.put(dpp, name, info, NULL);
400 }
401 return r;
402 }
403 info.status = 0;
404 info.epoch = epoch;
405 info.meta.mtime = mtime;
406 info.meta.size = size;
407 info.flags = CACHE_FLAG_META | CACHE_FLAG_XATTRS;
408 if (objv_tracker) {
409 info.flags |= CACHE_FLAG_OBJV;
410 info.version = objv_tracker->read_version;
411 }
412 cache.put(dpp, name, info, NULL);
413 done:
414 if (psize)
415 *psize = size;
416 if (pmtime)
417 *pmtime = mtime;
418 if (pepoch)
419 *pepoch = epoch;
420 if (attrs)
421 *attrs = info.xattrs;
422 return 0;
423 }
424
425 int RGWSI_SysObj_Cache::distribute_cache(const DoutPrefixProvider *dpp,
426 const string& normal_name,
427 const rgw_raw_obj& obj,
428 ObjectCacheInfo& obj_info, int op,
429 optional_yield y)
430 {
431 RGWCacheNotifyInfo info;
432 info.op = op;
433 info.obj_info = obj_info;
434 info.obj = obj;
435 return notify_svc->distribute(dpp, normal_name, info, y);
436 }
437
438 int RGWSI_SysObj_Cache::watch_cb(const DoutPrefixProvider *dpp,
439 uint64_t notify_id,
440 uint64_t cookie,
441 uint64_t notifier_id,
442 bufferlist& bl)
443 {
444 RGWCacheNotifyInfo info;
445
446 try {
447 auto iter = bl.cbegin();
448 decode(info, iter);
449 } catch (buffer::end_of_buffer& err) {
450 ldpp_dout(dpp, 0) << "ERROR: got bad notification" << dendl;
451 return -EIO;
452 } catch (buffer::error& err) {
453 ldpp_dout(dpp, 0) << "ERROR: buffer::error" << dendl;
454 return -EIO;
455 }
456
457 rgw_pool pool;
458 string oid;
459 normalize_pool_and_obj(info.obj.pool, info.obj.oid, pool, oid);
460 string name = normal_name(pool, oid);
461
462 switch (info.op) {
463 case UPDATE_OBJ:
464 cache.put(dpp, name, info.obj_info, NULL);
465 break;
466 case INVALIDATE_OBJ:
467 cache.invalidate_remove(dpp, name);
468 break;
469 default:
470 ldpp_dout(dpp, 0) << "WARNING: got unknown notification op: " << info.op << dendl;
471 return -EINVAL;
472 }
473
474 return 0;
475 }
476
477 void RGWSI_SysObj_Cache::set_enabled(bool status)
478 {
479 cache.set_enabled(status);
480 }
481
482 bool RGWSI_SysObj_Cache::chain_cache_entry(const DoutPrefixProvider *dpp,
483 std::initializer_list<rgw_cache_entry_info *> cache_info_entries,
484 RGWChainedCache::Entry *chained_entry)
485 {
486 return cache.chain_cache_entry(dpp, cache_info_entries, chained_entry);
487 }
488
489 void RGWSI_SysObj_Cache::register_chained_cache(RGWChainedCache *cc)
490 {
491 cache.chain_cache(cc);
492 }
493
494 void RGWSI_SysObj_Cache::unregister_chained_cache(RGWChainedCache *cc)
495 {
496 cache.unchain_cache(cc);
497 }
498
499 static void cache_list_dump_helper(Formatter* f,
500 const std::string& name,
501 const ceph::real_time mtime,
502 const std::uint64_t size)
503 {
504 f->dump_string("name", name);
505 f->dump_string("mtime", ceph::to_iso_8601(mtime));
506 f->dump_unsigned("size", size);
507 }
508
509 class RGWSI_SysObj_Cache_ASocketHook : public AdminSocketHook {
510 RGWSI_SysObj_Cache *svc;
511
512 static constexpr std::string_view admin_commands[][2] = {
513 { "cache list name=filter,type=CephString,req=false",
514 "cache list [filter_str]: list object cache, possibly matching substrings" },
515 { "cache inspect name=target,type=CephString,req=true",
516 "cache inspect target: print cache element" },
517 { "cache erase name=target,type=CephString,req=true",
518 "cache erase target: erase element from cache" },
519 { "cache zap",
520 "cache zap: erase all elements from cache" }
521 };
522
523 public:
524 RGWSI_SysObj_Cache_ASocketHook(RGWSI_SysObj_Cache *_svc) : svc(_svc) {}
525
526 int start();
527 void shutdown();
528
529 int call(std::string_view command, const cmdmap_t& cmdmap,
530 Formatter *f,
531 std::ostream& ss,
532 bufferlist& out) override;
533 };
534
535 int RGWSI_SysObj_Cache_ASocketHook::start()
536 {
537 auto admin_socket = svc->ctx()->get_admin_socket();
538 for (auto cmd : admin_commands) {
539 int r = admin_socket->register_command(cmd[0], this, cmd[1]);
540 if (r < 0) {
541 ldout(svc->ctx(), 0) << "ERROR: fail to register admin socket command (r=" << r
542 << ")" << dendl;
543 return r;
544 }
545 }
546 return 0;
547 }
548
549 void RGWSI_SysObj_Cache_ASocketHook::shutdown()
550 {
551 auto admin_socket = svc->ctx()->get_admin_socket();
552 admin_socket->unregister_commands(this);
553 }
554
555 int RGWSI_SysObj_Cache_ASocketHook::call(
556 std::string_view command, const cmdmap_t& cmdmap,
557 Formatter *f,
558 std::ostream& ss,
559 bufferlist& out)
560 {
561 if (command == "cache list"sv) {
562 std::optional<std::string> filter;
563 if (auto i = cmdmap.find("filter"); i != cmdmap.cend()) {
564 filter = boost::get<std::string>(i->second);
565 }
566 f->open_array_section("cache_entries");
567 svc->asocket.call_list(filter, f);
568 f->close_section();
569 return 0;
570 } else if (command == "cache inspect"sv) {
571 const auto& target = boost::get<std::string>(cmdmap.at("target"));
572 if (svc->asocket.call_inspect(target, f)) {
573 return 0;
574 } else {
575 ss << "Unable to find entry "s + target + ".\n";
576 return -ENOENT;
577 }
578 } else if (command == "cache erase"sv) {
579 const auto& target = boost::get<std::string>(cmdmap.at("target"));
580 if (svc->asocket.call_erase(target)) {
581 return 0;
582 } else {
583 ss << "Unable to find entry "s + target + ".\n";
584 return -ENOENT;
585 }
586 } else if (command == "cache zap"sv) {
587 svc->asocket.call_zap();
588 return 0;
589 }
590 return -ENOSYS;
591 }
592
593 RGWSI_SysObj_Cache::ASocketHandler::ASocketHandler(const DoutPrefixProvider *_dpp, RGWSI_SysObj_Cache *_svc) : dpp(_dpp), svc(_svc)
594 {
595 hook.reset(new RGWSI_SysObj_Cache_ASocketHook(_svc));
596 }
597
598 RGWSI_SysObj_Cache::ASocketHandler::~ASocketHandler()
599 {
600 }
601
602 int RGWSI_SysObj_Cache::ASocketHandler::start()
603 {
604 return hook->start();
605 }
606
607 void RGWSI_SysObj_Cache::ASocketHandler::shutdown()
608 {
609 return hook->shutdown();
610 }
611
612 void RGWSI_SysObj_Cache::ASocketHandler::call_list(const std::optional<std::string>& filter, Formatter* f)
613 {
614 svc->cache.for_each(
615 [&filter, f] (const string& name, const ObjectCacheEntry& entry) {
616 if (!filter || name.find(*filter) != name.npos) {
617 cache_list_dump_helper(f, name, entry.info.meta.mtime,
618 entry.info.meta.size);
619 }
620 });
621 }
622
623 int RGWSI_SysObj_Cache::ASocketHandler::call_inspect(const std::string& target, Formatter* f)
624 {
625 if (const auto entry = svc->cache.get(dpp, target)) {
626 f->open_object_section("cache_entry");
627 f->dump_string("name", target.c_str());
628 entry->dump(f);
629 f->close_section();
630 return true;
631 } else {
632 return false;
633 }
634 }
635
636 int RGWSI_SysObj_Cache::ASocketHandler::call_erase(const std::string& target)
637 {
638 return svc->cache.invalidate_remove(dpp, target);
639 }
640
641 int RGWSI_SysObj_Cache::ASocketHandler::call_zap()
642 {
643 svc->cache.invalidate_all();
644 return 0;
645 }