]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/services/svc_sys_obj_cache.cc
9be71f706708c5b17e3817a8b83010243238ae88
[ceph.git] / ceph / src / rgw / services / svc_sys_obj_cache.cc
1
2 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3 // vim: ts=8 sw=2 smarttab ft=cpp
4
5 #include "common/admin_socket.h"
6
7 #include "svc_sys_obj_cache.h"
8 #include "svc_zone.h"
9 #include "svc_notify.h"
10
11 #include "rgw/rgw_zone.h"
12 #include "rgw/rgw_tools.h"
13
14 #define dout_subsys ceph_subsys_rgw
15
16 class RGWSI_SysObj_Cache_CB : public RGWSI_Notify::CB
17 {
18 RGWSI_SysObj_Cache *svc;
19 public:
20 RGWSI_SysObj_Cache_CB(RGWSI_SysObj_Cache *_svc) : svc(_svc) {}
21 int watch_cb(uint64_t notify_id,
22 uint64_t cookie,
23 uint64_t notifier_id,
24 bufferlist& bl) {
25 return svc->watch_cb(notify_id, cookie, notifier_id, bl);
26 }
27
28 void set_enabled(bool status) {
29 svc->set_enabled(status);
30 }
31 };
32
33 int RGWSI_SysObj_Cache::do_start(optional_yield y)
34 {
35 int r = asocket.start();
36 if (r < 0) {
37 return r;
38 }
39
40 r = RGWSI_SysObj_Core::do_start(y);
41 if (r < 0) {
42 return r;
43 }
44
45 r = notify_svc->start(y);
46 if (r < 0) {
47 return r;
48 }
49
50 assert(notify_svc->is_started());
51
52 cb.reset(new RGWSI_SysObj_Cache_CB(this));
53
54 notify_svc->register_watch_cb(cb.get());
55
56 return 0;
57 }
58
59 void RGWSI_SysObj_Cache::shutdown()
60 {
61 asocket.shutdown();
62 RGWSI_SysObj_Core::shutdown();
63 }
64
65 static string normal_name(rgw_pool& pool, const std::string& oid) {
66 std::string buf;
67 buf.reserve(pool.name.size() + pool.ns.size() + oid.size() + 2);
68 buf.append(pool.name).append("+").append(pool.ns).append("+").append(oid);
69 return buf;
70 }
71
72 void RGWSI_SysObj_Cache::normalize_pool_and_obj(const rgw_pool& src_pool, const string& src_obj, rgw_pool& dst_pool, string& dst_obj)
73 {
74 if (src_obj.size()) {
75 dst_pool = src_pool;
76 dst_obj = src_obj;
77 } else {
78 dst_pool = zone_svc->get_zone_params().domain_root;
79 dst_obj = src_pool.name;
80 }
81 }
82
83
84 int RGWSI_SysObj_Cache::remove(RGWSysObjectCtxBase& obj_ctx,
85 RGWObjVersionTracker *objv_tracker,
86 const rgw_raw_obj& obj,
87 optional_yield y)
88
89 {
90 rgw_pool pool;
91 string oid;
92 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
93
94 string name = normal_name(pool, oid);
95 cache.remove(name);
96
97 ObjectCacheInfo info;
98 int r = distribute_cache(name, obj, info, REMOVE_OBJ, y);
99 if (r < 0) {
100 ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to distribute cache: r=" << r << dendl;
101 }
102
103 return RGWSI_SysObj_Core::remove(obj_ctx, objv_tracker, obj, y);
104 }
105
106 int RGWSI_SysObj_Cache::read(RGWSysObjectCtxBase& obj_ctx,
107 RGWSI_SysObj_Obj_GetObjState& read_state,
108 RGWObjVersionTracker *objv_tracker,
109 const rgw_raw_obj& obj,
110 bufferlist *obl, off_t ofs, off_t end,
111 map<string, bufferlist> *attrs,
112 bool raw_attrs,
113 rgw_cache_entry_info *cache_info,
114 boost::optional<obj_version> refresh_version,
115 optional_yield y)
116 {
117 rgw_pool pool;
118 string oid;
119 if (ofs != 0) {
120 return RGWSI_SysObj_Core::read(obj_ctx, read_state, objv_tracker,
121 obj, obl, ofs, end, attrs, raw_attrs,
122 cache_info, refresh_version, y);
123 }
124
125 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
126 string name = normal_name(pool, oid);
127
128 ObjectCacheInfo info;
129
130 uint32_t flags = (end != 0 ? CACHE_FLAG_DATA : 0);
131 if (objv_tracker)
132 flags |= CACHE_FLAG_OBJV;
133 if (attrs)
134 flags |= CACHE_FLAG_XATTRS;
135
136 int r = cache.get(name, info, flags, cache_info);
137 if (r == 0 &&
138 (!refresh_version || !info.version.compare(&(*refresh_version)))) {
139 if (info.status < 0)
140 return info.status;
141
142 bufferlist& bl = info.data;
143
144 bufferlist::iterator i = bl.begin();
145
146 obl->clear();
147
148 i.copy_all(*obl);
149 if (objv_tracker)
150 objv_tracker->read_version = info.version;
151 if (attrs) {
152 if (raw_attrs) {
153 *attrs = info.xattrs;
154 } else {
155 rgw_filter_attrset(info.xattrs, RGW_ATTR_PREFIX, attrs);
156 }
157 }
158 return obl->length();
159 }
160 if(r == -ENODATA)
161 return -ENOENT;
162
163 map<string, bufferlist> unfiltered_attrset;
164 r = RGWSI_SysObj_Core::read(obj_ctx, read_state, objv_tracker,
165 obj, obl, ofs, end,
166 (attrs ? &unfiltered_attrset : nullptr),
167 true, /* cache unfiltered attrs */
168 cache_info,
169 refresh_version, y);
170 if (r < 0) {
171 if (r == -ENOENT) { // only update ENOENT, we'd rather retry other errors
172 info.status = r;
173 cache.put(name, info, cache_info);
174 }
175 return r;
176 }
177
178 if (obl->length() == end + 1) {
179 /* in this case, most likely object contains more data, we can't cache it */
180 flags &= ~CACHE_FLAG_DATA;
181 } else {
182 bufferptr p(r);
183 bufferlist& bl = info.data;
184 bl.clear();
185 bufferlist::iterator o = obl->begin();
186 o.copy_all(bl);
187 }
188
189 info.status = 0;
190 info.flags = flags;
191 if (objv_tracker) {
192 info.version = objv_tracker->read_version;
193 }
194 if (attrs) {
195 info.xattrs = std::move(unfiltered_attrset);
196 if (raw_attrs) {
197 *attrs = info.xattrs;
198 } else {
199 rgw_filter_attrset(info.xattrs, RGW_ATTR_PREFIX, attrs);
200 }
201 }
202 cache.put(name, info, cache_info);
203 return r;
204 }
205
206 int RGWSI_SysObj_Cache::get_attr(const rgw_raw_obj& obj,
207 const char *attr_name,
208 bufferlist *dest,
209 optional_yield y)
210 {
211 rgw_pool pool;
212 string oid;
213
214 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
215 string name = normal_name(pool, oid);
216
217 ObjectCacheInfo info;
218
219 uint32_t flags = CACHE_FLAG_XATTRS;
220
221 int r = cache.get(name, info, flags, nullptr);
222 if (r == 0) {
223 if (info.status < 0)
224 return info.status;
225
226 auto iter = info.xattrs.find(attr_name);
227 if (iter == info.xattrs.end()) {
228 return -ENODATA;
229 }
230
231 *dest = iter->second;
232 return dest->length();
233 } else if (r == -ENODATA) {
234 return -ENOENT;
235 }
236 /* don't try to cache this one */
237 return RGWSI_SysObj_Core::get_attr(obj, attr_name, dest, y);
238 }
239
240 int RGWSI_SysObj_Cache::set_attrs(const rgw_raw_obj& obj,
241 map<string, bufferlist>& attrs,
242 map<string, bufferlist> *rmattrs,
243 RGWObjVersionTracker *objv_tracker,
244 optional_yield y)
245 {
246 rgw_pool pool;
247 string oid;
248 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
249 ObjectCacheInfo info;
250 info.xattrs = attrs;
251 if (rmattrs) {
252 info.rm_xattrs = *rmattrs;
253 }
254 info.status = 0;
255 info.flags = CACHE_FLAG_MODIFY_XATTRS;
256 int ret = RGWSI_SysObj_Core::set_attrs(obj, attrs, rmattrs, objv_tracker, y);
257 string name = normal_name(pool, oid);
258 if (ret >= 0) {
259 if (objv_tracker && objv_tracker->read_version.ver) {
260 info.version = objv_tracker->read_version;
261 info.flags |= CACHE_FLAG_OBJV;
262 }
263 cache.put(name, info, NULL);
264 int r = distribute_cache(name, obj, info, UPDATE_OBJ, y);
265 if (r < 0)
266 ldout(cct, 0) << "ERROR: failed to distribute cache for " << obj << dendl;
267 } else {
268 cache.remove(name);
269 }
270
271 return ret;
272 }
273
274 int RGWSI_SysObj_Cache::write(const rgw_raw_obj& obj,
275 real_time *pmtime,
276 map<std::string, bufferlist>& attrs,
277 bool exclusive,
278 const bufferlist& data,
279 RGWObjVersionTracker *objv_tracker,
280 real_time set_mtime,
281 optional_yield y)
282 {
283 rgw_pool pool;
284 string oid;
285 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
286 ObjectCacheInfo info;
287 info.xattrs = attrs;
288 info.status = 0;
289 info.data = data;
290 info.flags = CACHE_FLAG_XATTRS | CACHE_FLAG_DATA | CACHE_FLAG_META;
291 ceph::real_time result_mtime;
292 int ret = RGWSI_SysObj_Core::write(obj, &result_mtime, attrs,
293 exclusive, data,
294 objv_tracker, set_mtime, y);
295 if (pmtime) {
296 *pmtime = result_mtime;
297 }
298 if (objv_tracker && objv_tracker->read_version.ver) {
299 info.version = objv_tracker->read_version;
300 info.flags |= CACHE_FLAG_OBJV;
301 }
302 info.meta.mtime = result_mtime;
303 info.meta.size = data.length();
304 string name = normal_name(pool, oid);
305 if (ret >= 0) {
306 cache.put(name, info, NULL);
307 int r = distribute_cache(name, obj, info, UPDATE_OBJ, y);
308 if (r < 0)
309 ldout(cct, 0) << "ERROR: failed to distribute cache for " << obj << dendl;
310 } else {
311 cache.remove(name);
312 }
313
314 return ret;
315 }
316
317 int RGWSI_SysObj_Cache::write_data(const rgw_raw_obj& obj,
318 const bufferlist& data,
319 bool exclusive,
320 RGWObjVersionTracker *objv_tracker,
321 optional_yield y)
322 {
323 rgw_pool pool;
324 string oid;
325 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
326
327 ObjectCacheInfo info;
328 info.data = data;
329 info.meta.size = data.length();
330 info.status = 0;
331 info.flags = CACHE_FLAG_DATA;
332
333 int ret = RGWSI_SysObj_Core::write_data(obj, data, exclusive, objv_tracker, y);
334 string name = normal_name(pool, oid);
335 if (ret >= 0) {
336 if (objv_tracker && objv_tracker->read_version.ver) {
337 info.version = objv_tracker->read_version;
338 info.flags |= CACHE_FLAG_OBJV;
339 }
340 cache.put(name, info, NULL);
341 int r = distribute_cache(name, obj, info, UPDATE_OBJ, y);
342 if (r < 0)
343 ldout(cct, 0) << "ERROR: failed to distribute cache for " << obj << dendl;
344 } else {
345 cache.remove(name);
346 }
347
348 return ret;
349 }
350
351 int RGWSI_SysObj_Cache::raw_stat(const rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime, uint64_t *pepoch,
352 map<string, bufferlist> *attrs, bufferlist *first_chunk,
353 RGWObjVersionTracker *objv_tracker,
354 optional_yield y)
355 {
356 rgw_pool pool;
357 string oid;
358 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
359
360 string name = normal_name(pool, oid);
361
362 uint64_t size;
363 real_time mtime;
364 uint64_t epoch;
365
366 ObjectCacheInfo info;
367 uint32_t flags = CACHE_FLAG_META | CACHE_FLAG_XATTRS;
368 if (objv_tracker)
369 flags |= CACHE_FLAG_OBJV;
370 int r = cache.get(name, info, flags, NULL);
371 if (r == 0) {
372 if (info.status < 0)
373 return info.status;
374
375 size = info.meta.size;
376 mtime = info.meta.mtime;
377 epoch = info.epoch;
378 if (objv_tracker)
379 objv_tracker->read_version = info.version;
380 goto done;
381 }
382 if (r == -ENODATA) {
383 return -ENOENT;
384 }
385 r = RGWSI_SysObj_Core::raw_stat(obj, &size, &mtime, &epoch, &info.xattrs,
386 first_chunk, objv_tracker, y);
387 if (r < 0) {
388 if (r == -ENOENT) {
389 info.status = r;
390 cache.put(name, info, NULL);
391 }
392 return r;
393 }
394 info.status = 0;
395 info.epoch = epoch;
396 info.meta.mtime = mtime;
397 info.meta.size = size;
398 info.flags = CACHE_FLAG_META | CACHE_FLAG_XATTRS;
399 if (objv_tracker) {
400 info.flags |= CACHE_FLAG_OBJV;
401 info.version = objv_tracker->read_version;
402 }
403 cache.put(name, info, NULL);
404 done:
405 if (psize)
406 *psize = size;
407 if (pmtime)
408 *pmtime = mtime;
409 if (pepoch)
410 *pepoch = epoch;
411 if (attrs)
412 *attrs = info.xattrs;
413 return 0;
414 }
415
416 int RGWSI_SysObj_Cache::distribute_cache(const string& normal_name,
417 const rgw_raw_obj& obj,
418 ObjectCacheInfo& obj_info, int op,
419 optional_yield y)
420 {
421 RGWCacheNotifyInfo info;
422 info.op = op;
423 info.obj_info = obj_info;
424 info.obj = obj;
425 bufferlist bl;
426 encode(info, bl);
427 return notify_svc->distribute(normal_name, bl, y);
428 }
429
430 int RGWSI_SysObj_Cache::watch_cb(uint64_t notify_id,
431 uint64_t cookie,
432 uint64_t notifier_id,
433 bufferlist& bl)
434 {
435 RGWCacheNotifyInfo info;
436
437 try {
438 auto iter = bl.cbegin();
439 decode(info, iter);
440 } catch (buffer::end_of_buffer& err) {
441 ldout(cct, 0) << "ERROR: got bad notification" << dendl;
442 return -EIO;
443 } catch (buffer::error& err) {
444 ldout(cct, 0) << "ERROR: buffer::error" << dendl;
445 return -EIO;
446 }
447
448 rgw_pool pool;
449 string oid;
450 normalize_pool_and_obj(info.obj.pool, info.obj.oid, pool, oid);
451 string name = normal_name(pool, oid);
452
453 switch (info.op) {
454 case UPDATE_OBJ:
455 cache.put(name, info.obj_info, NULL);
456 break;
457 case REMOVE_OBJ:
458 cache.remove(name);
459 break;
460 default:
461 ldout(cct, 0) << "WARNING: got unknown notification op: " << info.op << dendl;
462 return -EINVAL;
463 }
464
465 return 0;
466 }
467
468 void RGWSI_SysObj_Cache::set_enabled(bool status)
469 {
470 cache.set_enabled(status);
471 }
472
473 bool RGWSI_SysObj_Cache::chain_cache_entry(std::initializer_list<rgw_cache_entry_info *> cache_info_entries,
474 RGWChainedCache::Entry *chained_entry)
475 {
476 return cache.chain_cache_entry(cache_info_entries, chained_entry);
477 }
478
479 void RGWSI_SysObj_Cache::register_chained_cache(RGWChainedCache *cc)
480 {
481 cache.chain_cache(cc);
482 }
483
484 void RGWSI_SysObj_Cache::unregister_chained_cache(RGWChainedCache *cc)
485 {
486 cache.unchain_cache(cc);
487 }
488
489 static void cache_list_dump_helper(Formatter* f,
490 const std::string& name,
491 const ceph::real_time mtime,
492 const std::uint64_t size)
493 {
494 f->dump_string("name", name);
495 f->dump_string("mtime", ceph::to_iso_8601(mtime));
496 f->dump_unsigned("size", size);
497 }
498
499 class RGWSI_SysObj_Cache_ASocketHook : public AdminSocketHook {
500 RGWSI_SysObj_Cache *svc;
501
502 static constexpr std::string_view admin_commands[][2] = {
503 { "cache list name=filter,type=CephString,req=false",
504 "cache list [filter_str]: list object cache, possibly matching substrings" },
505 { "cache inspect name=target,type=CephString,req=true",
506 "cache inspect target: print cache element" },
507 { "cache erase name=target,type=CephString,req=true",
508 "cache erase target: erase element from cache" },
509 { "cache zap",
510 "cache zap: erase all elements from cache" }
511 };
512
513 public:
514 RGWSI_SysObj_Cache_ASocketHook(RGWSI_SysObj_Cache *_svc) : svc(_svc) {}
515
516 int start();
517 void shutdown();
518
519 int call(std::string_view command, const cmdmap_t& cmdmap,
520 Formatter *f,
521 std::ostream& ss,
522 bufferlist& out) override;
523 };
524
525 int RGWSI_SysObj_Cache_ASocketHook::start()
526 {
527 auto admin_socket = svc->ctx()->get_admin_socket();
528 for (auto cmd : admin_commands) {
529 int r = admin_socket->register_command(cmd[0], this, cmd[1]);
530 if (r < 0) {
531 ldout(svc->ctx(), 0) << "ERROR: fail to register admin socket command (r=" << r
532 << ")" << dendl;
533 return r;
534 }
535 }
536 return 0;
537 }
538
539 void RGWSI_SysObj_Cache_ASocketHook::shutdown()
540 {
541 auto admin_socket = svc->ctx()->get_admin_socket();
542 admin_socket->unregister_commands(this);
543 }
544
545 int RGWSI_SysObj_Cache_ASocketHook::call(
546 std::string_view command, const cmdmap_t& cmdmap,
547 Formatter *f,
548 std::ostream& ss,
549 bufferlist& out)
550 {
551 if (command == "cache list"sv) {
552 std::optional<std::string> filter;
553 if (auto i = cmdmap.find("filter"); i != cmdmap.cend()) {
554 filter = boost::get<std::string>(i->second);
555 }
556 f->open_array_section("cache_entries");
557 svc->asocket.call_list(filter, f);
558 f->close_section();
559 return 0;
560 } else if (command == "cache inspect"sv) {
561 const auto& target = boost::get<std::string>(cmdmap.at("target"));
562 if (svc->asocket.call_inspect(target, f)) {
563 return 0;
564 } else {
565 ss << "Unable to find entry "s + target + ".\n";
566 return -ENOENT;
567 }
568 } else if (command == "cache erase"sv) {
569 const auto& target = boost::get<std::string>(cmdmap.at("target"));
570 if (svc->asocket.call_erase(target)) {
571 return 0;
572 } else {
573 ss << "Unable to find entry "s + target + ".\n";
574 return -ENOENT;
575 }
576 } else if (command == "cache zap"sv) {
577 svc->asocket.call_zap();
578 return 0;
579 }
580 return -ENOSYS;
581 }
582
583 RGWSI_SysObj_Cache::ASocketHandler::ASocketHandler(RGWSI_SysObj_Cache *_svc) : svc(_svc)
584 {
585 hook.reset(new RGWSI_SysObj_Cache_ASocketHook(_svc));
586 }
587
588 RGWSI_SysObj_Cache::ASocketHandler::~ASocketHandler()
589 {
590 }
591
592 int RGWSI_SysObj_Cache::ASocketHandler::start()
593 {
594 return hook->start();
595 }
596
597 void RGWSI_SysObj_Cache::ASocketHandler::shutdown()
598 {
599 return hook->shutdown();
600 }
601
602 void RGWSI_SysObj_Cache::ASocketHandler::call_list(const std::optional<std::string>& filter, Formatter* f)
603 {
604 svc->cache.for_each(
605 [&filter, f] (const string& name, const ObjectCacheEntry& entry) {
606 if (!filter || name.find(*filter) != name.npos) {
607 cache_list_dump_helper(f, name, entry.info.meta.mtime,
608 entry.info.meta.size);
609 }
610 });
611 }
612
613 int RGWSI_SysObj_Cache::ASocketHandler::call_inspect(const std::string& target, Formatter* f)
614 {
615 if (const auto entry = svc->cache.get(target)) {
616 f->open_object_section("cache_entry");
617 f->dump_string("name", target.c_str());
618 entry->dump(f);
619 f->close_section();
620 return true;
621 } else {
622 return false;
623 }
624 }
625
626 int RGWSI_SysObj_Cache::ASocketHandler::call_erase(const std::string& target)
627 {
628 return svc->cache.remove(target);
629 }
630
631 int RGWSI_SysObj_Cache::ASocketHandler::call_zap()
632 {
633 svc->cache.invalidate_all();
634 return 0;
635 }