]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/services/svc_sys_obj_cache.cc
import ceph 15.2.14
[ceph.git] / ceph / src / rgw / services / svc_sys_obj_cache.cc
CommitLineData
9f95a23c
TL
1
2// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3// vim: ts=8 sw=2 smarttab ft=cpp
4
5#include "common/admin_socket.h"
6
11fdf7f2
TL
7#include "svc_sys_obj_cache.h"
8#include "svc_zone.h"
9#include "svc_notify.h"
10
11#include "rgw/rgw_zone.h"
12#include "rgw/rgw_tools.h"
13
14#define dout_subsys ceph_subsys_rgw
15
16class RGWSI_SysObj_Cache_CB : public RGWSI_Notify::CB
17{
18 RGWSI_SysObj_Cache *svc;
19public:
20 RGWSI_SysObj_Cache_CB(RGWSI_SysObj_Cache *_svc) : svc(_svc) {}
21 int watch_cb(uint64_t notify_id,
22 uint64_t cookie,
23 uint64_t notifier_id,
24 bufferlist& bl) {
25 return svc->watch_cb(notify_id, cookie, notifier_id, bl);
26 }
27
28 void set_enabled(bool status) {
29 svc->set_enabled(status);
30 }
31};
32
33int RGWSI_SysObj_Cache::do_start()
34{
9f95a23c
TL
35 int r = asocket.start();
36 if (r < 0) {
37 return r;
38 }
39
40 r = RGWSI_SysObj_Core::do_start();
11fdf7f2
TL
41 if (r < 0) {
42 return r;
43 }
44
45 r = notify_svc->start();
46 if (r < 0) {
47 return r;
48 }
49
50 assert(notify_svc->is_started());
51
52 cb.reset(new RGWSI_SysObj_Cache_CB(this));
53
54 notify_svc->register_watch_cb(cb.get());
55
56 return 0;
57}
58
9f95a23c
TL
59void RGWSI_SysObj_Cache::shutdown()
60{
61 asocket.shutdown();
62 RGWSI_SysObj_Core::shutdown();
63}
64
11fdf7f2
TL
65static string normal_name(rgw_pool& pool, const std::string& oid) {
66 std::string buf;
67 buf.reserve(pool.name.size() + pool.ns.size() + oid.size() + 2);
68 buf.append(pool.name).append("+").append(pool.ns).append("+").append(oid);
69 return buf;
70}
71
72void RGWSI_SysObj_Cache::normalize_pool_and_obj(const rgw_pool& src_pool, const string& src_obj, rgw_pool& dst_pool, string& dst_obj)
73{
74 if (src_obj.size()) {
75 dst_pool = src_pool;
76 dst_obj = src_obj;
77 } else {
78 dst_pool = zone_svc->get_zone_params().domain_root;
79 dst_obj = src_pool.name;
80 }
81}
82
83
84int RGWSI_SysObj_Cache::remove(RGWSysObjectCtxBase& obj_ctx,
85 RGWObjVersionTracker *objv_tracker,
9f95a23c
TL
86 const rgw_raw_obj& obj,
87 optional_yield y)
11fdf7f2
TL
88
89{
90 rgw_pool pool;
91 string oid;
92 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
93
94 string name = normal_name(pool, oid);
95 cache.remove(name);
96
97 ObjectCacheInfo info;
9f95a23c 98 int r = distribute_cache(name, obj, info, REMOVE_OBJ, y);
11fdf7f2
TL
99 if (r < 0) {
100 ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to distribute cache: r=" << r << dendl;
101 }
102
9f95a23c 103 return RGWSI_SysObj_Core::remove(obj_ctx, objv_tracker, obj, y);
11fdf7f2
TL
104}
105
106int RGWSI_SysObj_Cache::read(RGWSysObjectCtxBase& obj_ctx,
9f95a23c 107 RGWSI_SysObj_Obj_GetObjState& read_state,
11fdf7f2
TL
108 RGWObjVersionTracker *objv_tracker,
109 const rgw_raw_obj& obj,
110 bufferlist *obl, off_t ofs, off_t end,
111 map<string, bufferlist> *attrs,
112 bool raw_attrs,
113 rgw_cache_entry_info *cache_info,
9f95a23c
TL
114 boost::optional<obj_version> refresh_version,
115 optional_yield y)
11fdf7f2
TL
116{
117 rgw_pool pool;
118 string oid;
119 if (ofs != 0) {
120 return RGWSI_SysObj_Core::read(obj_ctx, read_state, objv_tracker,
9f95a23c
TL
121 obj, obl, ofs, end, attrs, raw_attrs,
122 cache_info, refresh_version, y);
11fdf7f2
TL
123 }
124
125 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
126 string name = normal_name(pool, oid);
127
128 ObjectCacheInfo info;
129
130 uint32_t flags = (end != 0 ? CACHE_FLAG_DATA : 0);
131 if (objv_tracker)
132 flags |= CACHE_FLAG_OBJV;
133 if (attrs)
134 flags |= CACHE_FLAG_XATTRS;
f6b5b4d7
TL
135
136 int r = cache.get(name, info, flags, cache_info);
137 if (r == 0 &&
11fdf7f2
TL
138 (!refresh_version || !info.version.compare(&(*refresh_version)))) {
139 if (info.status < 0)
140 return info.status;
141
142 bufferlist& bl = info.data;
143
144 bufferlist::iterator i = bl.begin();
145
146 obl->clear();
147
148 i.copy_all(*obl);
149 if (objv_tracker)
150 objv_tracker->read_version = info.version;
151 if (attrs) {
152 if (raw_attrs) {
153 *attrs = info.xattrs;
154 } else {
155 rgw_filter_attrset(info.xattrs, RGW_ATTR_PREFIX, attrs);
156 }
157 }
158 return obl->length();
159 }
f6b5b4d7
TL
160 if(r == -ENODATA)
161 return -ENOENT;
11fdf7f2
TL
162
163 map<string, bufferlist> unfiltered_attrset;
f6b5b4d7 164 r = RGWSI_SysObj_Core::read(obj_ctx, read_state, objv_tracker,
11fdf7f2
TL
165 obj, obl, ofs, end,
166 (attrs ? &unfiltered_attrset : nullptr),
167 true, /* cache unfiltered attrs */
168 cache_info,
9f95a23c 169 refresh_version, y);
11fdf7f2
TL
170 if (r < 0) {
171 if (r == -ENOENT) { // only update ENOENT, we'd rather retry other errors
172 info.status = r;
173 cache.put(name, info, cache_info);
174 }
175 return r;
176 }
177
178 if (obl->length() == end + 1) {
179 /* in this case, most likely object contains more data, we can't cache it */
180 flags &= ~CACHE_FLAG_DATA;
181 } else {
182 bufferptr p(r);
183 bufferlist& bl = info.data;
184 bl.clear();
185 bufferlist::iterator o = obl->begin();
186 o.copy_all(bl);
187 }
188
189 info.status = 0;
190 info.flags = flags;
191 if (objv_tracker) {
192 info.version = objv_tracker->read_version;
193 }
194 if (attrs) {
195 info.xattrs = std::move(unfiltered_attrset);
196 if (raw_attrs) {
197 *attrs = info.xattrs;
198 } else {
199 rgw_filter_attrset(info.xattrs, RGW_ATTR_PREFIX, attrs);
200 }
201 }
202 cache.put(name, info, cache_info);
203 return r;
204}
205
206int RGWSI_SysObj_Cache::get_attr(const rgw_raw_obj& obj,
9f95a23c
TL
207 const char *attr_name,
208 bufferlist *dest,
209 optional_yield y)
11fdf7f2
TL
210{
211 rgw_pool pool;
212 string oid;
213
214 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
215 string name = normal_name(pool, oid);
216
217 ObjectCacheInfo info;
218
219 uint32_t flags = CACHE_FLAG_XATTRS;
220
f6b5b4d7
TL
221 int r = cache.get(name, info, flags, nullptr);
222 if (r == 0) {
11fdf7f2
TL
223 if (info.status < 0)
224 return info.status;
225
226 auto iter = info.xattrs.find(attr_name);
227 if (iter == info.xattrs.end()) {
228 return -ENODATA;
229 }
230
231 *dest = iter->second;
232 return dest->length();
f6b5b4d7
TL
233 } else if (r == -ENODATA) {
234 return -ENOENT;
11fdf7f2
TL
235 }
236 /* don't try to cache this one */
9f95a23c 237 return RGWSI_SysObj_Core::get_attr(obj, attr_name, dest, y);
11fdf7f2
TL
238}
239
240int RGWSI_SysObj_Cache::set_attrs(const rgw_raw_obj& obj,
241 map<string, bufferlist>& attrs,
242 map<string, bufferlist> *rmattrs,
9f95a23c
TL
243 RGWObjVersionTracker *objv_tracker,
244 optional_yield y)
11fdf7f2
TL
245{
246 rgw_pool pool;
247 string oid;
248 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
249 ObjectCacheInfo info;
250 info.xattrs = attrs;
251 if (rmattrs) {
252 info.rm_xattrs = *rmattrs;
253 }
254 info.status = 0;
255 info.flags = CACHE_FLAG_MODIFY_XATTRS;
9f95a23c 256 int ret = RGWSI_SysObj_Core::set_attrs(obj, attrs, rmattrs, objv_tracker, y);
11fdf7f2
TL
257 string name = normal_name(pool, oid);
258 if (ret >= 0) {
f91f0fd5
TL
259 if (objv_tracker && objv_tracker->read_version.ver) {
260 info.version = objv_tracker->read_version;
261 info.flags |= CACHE_FLAG_OBJV;
262 }
11fdf7f2 263 cache.put(name, info, NULL);
9f95a23c 264 int r = distribute_cache(name, obj, info, UPDATE_OBJ, y);
11fdf7f2
TL
265 if (r < 0)
266 ldout(cct, 0) << "ERROR: failed to distribute cache for " << obj << dendl;
267 } else {
268 cache.remove(name);
269 }
270
271 return ret;
272}
273
274int RGWSI_SysObj_Cache::write(const rgw_raw_obj& obj,
275 real_time *pmtime,
276 map<std::string, bufferlist>& attrs,
277 bool exclusive,
278 const bufferlist& data,
279 RGWObjVersionTracker *objv_tracker,
9f95a23c
TL
280 real_time set_mtime,
281 optional_yield y)
11fdf7f2
TL
282{
283 rgw_pool pool;
284 string oid;
285 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
286 ObjectCacheInfo info;
287 info.xattrs = attrs;
288 info.status = 0;
289 info.data = data;
290 info.flags = CACHE_FLAG_XATTRS | CACHE_FLAG_DATA | CACHE_FLAG_META;
11fdf7f2
TL
291 ceph::real_time result_mtime;
292 int ret = RGWSI_SysObj_Core::write(obj, &result_mtime, attrs,
9f95a23c
TL
293 exclusive, data,
294 objv_tracker, set_mtime, y);
11fdf7f2
TL
295 if (pmtime) {
296 *pmtime = result_mtime;
297 }
f91f0fd5
TL
298 if (objv_tracker && objv_tracker->read_version.ver) {
299 info.version = objv_tracker->read_version;
300 info.flags |= CACHE_FLAG_OBJV;
301 }
11fdf7f2
TL
302 info.meta.mtime = result_mtime;
303 info.meta.size = data.length();
304 string name = normal_name(pool, oid);
305 if (ret >= 0) {
306 cache.put(name, info, NULL);
adb31ebb
TL
307 int r = distribute_cache(name, obj, info, UPDATE_OBJ, y);
308 if (r < 0)
309 ldout(cct, 0) << "ERROR: failed to distribute cache for " << obj << dendl;
11fdf7f2
TL
310 } else {
311 cache.remove(name);
312 }
313
314 return ret;
315}
316
317int RGWSI_SysObj_Cache::write_data(const rgw_raw_obj& obj,
318 const bufferlist& data,
319 bool exclusive,
9f95a23c
TL
320 RGWObjVersionTracker *objv_tracker,
321 optional_yield y)
11fdf7f2
TL
322{
323 rgw_pool pool;
324 string oid;
325 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
326
327 ObjectCacheInfo info;
328 info.data = data;
329 info.meta.size = data.length();
330 info.status = 0;
331 info.flags = CACHE_FLAG_DATA;
332
9f95a23c 333 int ret = RGWSI_SysObj_Core::write_data(obj, data, exclusive, objv_tracker, y);
11fdf7f2
TL
334 string name = normal_name(pool, oid);
335 if (ret >= 0) {
f91f0fd5
TL
336 if (objv_tracker && objv_tracker->read_version.ver) {
337 info.version = objv_tracker->read_version;
338 info.flags |= CACHE_FLAG_OBJV;
339 }
11fdf7f2 340 cache.put(name, info, NULL);
9f95a23c 341 int r = distribute_cache(name, obj, info, UPDATE_OBJ, y);
11fdf7f2
TL
342 if (r < 0)
343 ldout(cct, 0) << "ERROR: failed to distribute cache for " << obj << dendl;
344 } else {
345 cache.remove(name);
346 }
347
348 return ret;
349}
350
351int RGWSI_SysObj_Cache::raw_stat(const rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime, uint64_t *pepoch,
352 map<string, bufferlist> *attrs, bufferlist *first_chunk,
9f95a23c
TL
353 RGWObjVersionTracker *objv_tracker,
354 optional_yield y)
11fdf7f2
TL
355{
356 rgw_pool pool;
357 string oid;
358 normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
359
360 string name = normal_name(pool, oid);
361
362 uint64_t size;
363 real_time mtime;
364 uint64_t epoch;
365
366 ObjectCacheInfo info;
367 uint32_t flags = CACHE_FLAG_META | CACHE_FLAG_XATTRS;
368 if (objv_tracker)
369 flags |= CACHE_FLAG_OBJV;
370 int r = cache.get(name, info, flags, NULL);
371 if (r == 0) {
372 if (info.status < 0)
373 return info.status;
374
375 size = info.meta.size;
376 mtime = info.meta.mtime;
377 epoch = info.epoch;
378 if (objv_tracker)
379 objv_tracker->read_version = info.version;
380 goto done;
381 }
f6b5b4d7
TL
382 if (r == -ENODATA) {
383 return -ENOENT;
384 }
9f95a23c
TL
385 r = RGWSI_SysObj_Core::raw_stat(obj, &size, &mtime, &epoch, &info.xattrs,
386 first_chunk, objv_tracker, y);
11fdf7f2
TL
387 if (r < 0) {
388 if (r == -ENOENT) {
389 info.status = r;
390 cache.put(name, info, NULL);
391 }
392 return r;
393 }
394 info.status = 0;
395 info.epoch = epoch;
396 info.meta.mtime = mtime;
397 info.meta.size = size;
398 info.flags = CACHE_FLAG_META | CACHE_FLAG_XATTRS;
399 if (objv_tracker) {
400 info.flags |= CACHE_FLAG_OBJV;
401 info.version = objv_tracker->read_version;
402 }
403 cache.put(name, info, NULL);
404done:
405 if (psize)
406 *psize = size;
407 if (pmtime)
408 *pmtime = mtime;
409 if (pepoch)
410 *pepoch = epoch;
411 if (attrs)
412 *attrs = info.xattrs;
413 return 0;
414}
415
9f95a23c
TL
416int RGWSI_SysObj_Cache::distribute_cache(const string& normal_name,
417 const rgw_raw_obj& obj,
418 ObjectCacheInfo& obj_info, int op,
419 optional_yield y)
11fdf7f2
TL
420{
421 RGWCacheNotifyInfo info;
11fdf7f2 422 info.op = op;
11fdf7f2
TL
423 info.obj_info = obj_info;
424 info.obj = obj;
ec96510d 425 return notify_svc->distribute(normal_name, info, y);
11fdf7f2
TL
426}
427
428int RGWSI_SysObj_Cache::watch_cb(uint64_t notify_id,
429 uint64_t cookie,
430 uint64_t notifier_id,
431 bufferlist& bl)
432{
433 RGWCacheNotifyInfo info;
434
435 try {
436 auto iter = bl.cbegin();
437 decode(info, iter);
438 } catch (buffer::end_of_buffer& err) {
439 ldout(cct, 0) << "ERROR: got bad notification" << dendl;
440 return -EIO;
441 } catch (buffer::error& err) {
442 ldout(cct, 0) << "ERROR: buffer::error" << dendl;
443 return -EIO;
444 }
445
446 rgw_pool pool;
447 string oid;
448 normalize_pool_and_obj(info.obj.pool, info.obj.oid, pool, oid);
449 string name = normal_name(pool, oid);
450
451 switch (info.op) {
452 case UPDATE_OBJ:
453 cache.put(name, info.obj_info, NULL);
454 break;
455 case REMOVE_OBJ:
456 cache.remove(name);
457 break;
458 default:
459 ldout(cct, 0) << "WARNING: got unknown notification op: " << info.op << dendl;
460 return -EINVAL;
461 }
462
463 return 0;
464}
465
466void RGWSI_SysObj_Cache::set_enabled(bool status)
467{
468 cache.set_enabled(status);
469}
470
471bool RGWSI_SysObj_Cache::chain_cache_entry(std::initializer_list<rgw_cache_entry_info *> cache_info_entries,
472 RGWChainedCache::Entry *chained_entry)
473{
474 return cache.chain_cache_entry(cache_info_entries, chained_entry);
475}
476
477void RGWSI_SysObj_Cache::register_chained_cache(RGWChainedCache *cc)
478{
479 cache.chain_cache(cc);
480}
481
482void RGWSI_SysObj_Cache::unregister_chained_cache(RGWChainedCache *cc)
483{
484 cache.unchain_cache(cc);
485}
486
487static void cache_list_dump_helper(Formatter* f,
488 const std::string& name,
489 const ceph::real_time mtime,
490 const std::uint64_t size)
491{
492 f->dump_string("name", name);
493 f->dump_string("mtime", ceph::to_iso_8601(mtime));
494 f->dump_unsigned("size", size);
495}
496
9f95a23c
TL
497class RGWSI_SysObj_Cache_ASocketHook : public AdminSocketHook {
498 RGWSI_SysObj_Cache *svc;
499
500 static constexpr const char* admin_commands[4][3] = {
501 { "cache list",
502 "cache list name=filter,type=CephString,req=false",
503 "cache list [filter_str]: list object cache, possibly matching substrings" },
504 { "cache inspect name=target,type=CephString,req=true",
505 "cache inspect target: print cache element" },
506 { "cache erase name=target,type=CephString,req=true",
507 "cache erase target: erase element from cache" },
508 { "cache zap",
509 "cache zap: erase all elements from cache" }
510 };
511
512public:
513 RGWSI_SysObj_Cache_ASocketHook(RGWSI_SysObj_Cache *_svc) : svc(_svc) {}
514
515 int start();
516 void shutdown();
517
518 int call(std::string_view command, const cmdmap_t& cmdmap,
519 Formatter *f,
520 std::ostream& ss,
521 bufferlist& out) override;
522};
523
524int RGWSI_SysObj_Cache_ASocketHook::start()
525{
526 auto admin_socket = svc->ctx()->get_admin_socket();
527 for (auto cmd : admin_commands) {
528 int r = admin_socket->register_command(cmd[0], this, cmd[1]);
529 if (r < 0) {
530 ldout(svc->ctx(), 0) << "ERROR: fail to register admin socket command (r=" << r
531 << ")" << dendl;
532 return r;
533 }
534 }
535 return 0;
536}
537
538void RGWSI_SysObj_Cache_ASocketHook::shutdown()
539{
540 auto admin_socket = svc->ctx()->get_admin_socket();
541 admin_socket->unregister_commands(this);
542}
543
544int RGWSI_SysObj_Cache_ASocketHook::call(
545 std::string_view command, const cmdmap_t& cmdmap,
546 Formatter *f,
547 std::ostream& ss,
548 bufferlist& out)
549{
550 if (command == "cache list"sv) {
551 std::optional<std::string> filter;
552 if (auto i = cmdmap.find("filter"); i != cmdmap.cend()) {
553 filter = boost::get<std::string>(i->second);
554 }
555 f->open_array_section("cache_entries");
556 svc->asocket.call_list(filter, f);
557 f->close_section();
558 return 0;
559 } else if (command == "cache inspect"sv) {
560 const auto& target = boost::get<std::string>(cmdmap.at("target"));
561 if (svc->asocket.call_inspect(target, f)) {
562 return 0;
563 } else {
564 ss << "Unable to find entry "s + target + ".\n";
565 return -ENOENT;
566 }
567 } else if (command == "cache erase"sv) {
568 const auto& target = boost::get<std::string>(cmdmap.at("target"));
569 if (svc->asocket.call_erase(target)) {
570 return 0;
571 } else {
572 ss << "Unable to find entry "s + target + ".\n";
573 return -ENOENT;
574 }
575 } else if (command == "cache zap"sv) {
576 svc->asocket.call_zap();
577 return 0;
578 }
579 return -ENOSYS;
580}
581
582RGWSI_SysObj_Cache::ASocketHandler::ASocketHandler(RGWSI_SysObj_Cache *_svc) : svc(_svc)
583{
584 hook.reset(new RGWSI_SysObj_Cache_ASocketHook(_svc));
585}
586
587RGWSI_SysObj_Cache::ASocketHandler::~ASocketHandler()
588{
589}
590
591int RGWSI_SysObj_Cache::ASocketHandler::start()
592{
593 return hook->start();
594}
595
596void RGWSI_SysObj_Cache::ASocketHandler::shutdown()
597{
598 return hook->shutdown();
599}
600
601void RGWSI_SysObj_Cache::ASocketHandler::call_list(const std::optional<std::string>& filter, Formatter* f)
11fdf7f2 602{
9f95a23c
TL
603 svc->cache.for_each(
604 [&filter, f] (const string& name, const ObjectCacheEntry& entry) {
11fdf7f2
TL
605 if (!filter || name.find(*filter) != name.npos) {
606 cache_list_dump_helper(f, name, entry.info.meta.mtime,
607 entry.info.meta.size);
608 }
609 });
610}
611
9f95a23c 612int RGWSI_SysObj_Cache::ASocketHandler::call_inspect(const std::string& target, Formatter* f)
11fdf7f2 613{
9f95a23c 614 if (const auto entry = svc->cache.get(target)) {
11fdf7f2
TL
615 f->open_object_section("cache_entry");
616 f->dump_string("name", target.c_str());
617 entry->dump(f);
618 f->close_section();
619 return true;
620 } else {
621 return false;
622 }
623}
624
9f95a23c 625int RGWSI_SysObj_Cache::ASocketHandler::call_erase(const std::string& target)
11fdf7f2 626{
9f95a23c 627 return svc->cache.remove(target);
11fdf7f2
TL
628}
629
9f95a23c 630int RGWSI_SysObj_Cache::ASocketHandler::call_zap()
11fdf7f2 631{
9f95a23c 632 svc->cache.invalidate_all();
11fdf7f2
TL
633 return 0;
634}