]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | #include "svc_sys_obj_cache.h" |
2 | #include "svc_zone.h" | |
3 | #include "svc_notify.h" | |
4 | ||
5 | #include "rgw/rgw_zone.h" | |
6 | #include "rgw/rgw_tools.h" | |
7 | ||
8 | #define dout_subsys ceph_subsys_rgw | |
9 | ||
10 | class RGWSI_SysObj_Cache_CB : public RGWSI_Notify::CB | |
11 | { | |
12 | RGWSI_SysObj_Cache *svc; | |
13 | public: | |
14 | RGWSI_SysObj_Cache_CB(RGWSI_SysObj_Cache *_svc) : svc(_svc) {} | |
15 | int watch_cb(uint64_t notify_id, | |
16 | uint64_t cookie, | |
17 | uint64_t notifier_id, | |
18 | bufferlist& bl) { | |
19 | return svc->watch_cb(notify_id, cookie, notifier_id, bl); | |
20 | } | |
21 | ||
22 | void set_enabled(bool status) { | |
23 | svc->set_enabled(status); | |
24 | } | |
25 | }; | |
26 | ||
27 | int RGWSI_SysObj_Cache::do_start() | |
28 | { | |
29 | int r = RGWSI_SysObj_Core::do_start(); | |
30 | if (r < 0) { | |
31 | return r; | |
32 | } | |
33 | ||
34 | r = notify_svc->start(); | |
35 | if (r < 0) { | |
36 | return r; | |
37 | } | |
38 | ||
39 | assert(notify_svc->is_started()); | |
40 | ||
41 | cb.reset(new RGWSI_SysObj_Cache_CB(this)); | |
42 | ||
43 | notify_svc->register_watch_cb(cb.get()); | |
44 | ||
45 | return 0; | |
46 | } | |
47 | ||
48 | static string normal_name(rgw_pool& pool, const std::string& oid) { | |
49 | std::string buf; | |
50 | buf.reserve(pool.name.size() + pool.ns.size() + oid.size() + 2); | |
51 | buf.append(pool.name).append("+").append(pool.ns).append("+").append(oid); | |
52 | return buf; | |
53 | } | |
54 | ||
55 | void RGWSI_SysObj_Cache::normalize_pool_and_obj(const rgw_pool& src_pool, const string& src_obj, rgw_pool& dst_pool, string& dst_obj) | |
56 | { | |
57 | if (src_obj.size()) { | |
58 | dst_pool = src_pool; | |
59 | dst_obj = src_obj; | |
60 | } else { | |
61 | dst_pool = zone_svc->get_zone_params().domain_root; | |
62 | dst_obj = src_pool.name; | |
63 | } | |
64 | } | |
65 | ||
66 | ||
67 | int RGWSI_SysObj_Cache::remove(RGWSysObjectCtxBase& obj_ctx, | |
68 | RGWObjVersionTracker *objv_tracker, | |
69 | const rgw_raw_obj& obj) | |
70 | ||
71 | { | |
72 | rgw_pool pool; | |
73 | string oid; | |
74 | normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); | |
75 | ||
76 | string name = normal_name(pool, oid); | |
77 | cache.remove(name); | |
78 | ||
79 | ObjectCacheInfo info; | |
80 | int r = distribute_cache(name, obj, info, REMOVE_OBJ); | |
81 | if (r < 0) { | |
82 | ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to distribute cache: r=" << r << dendl; | |
83 | } | |
84 | ||
85 | return RGWSI_SysObj_Core::remove(obj_ctx, objv_tracker, obj); | |
86 | } | |
87 | ||
88 | int RGWSI_SysObj_Cache::read(RGWSysObjectCtxBase& obj_ctx, | |
89 | GetObjState& read_state, | |
90 | RGWObjVersionTracker *objv_tracker, | |
91 | const rgw_raw_obj& obj, | |
92 | bufferlist *obl, off_t ofs, off_t end, | |
93 | map<string, bufferlist> *attrs, | |
94 | bool raw_attrs, | |
95 | rgw_cache_entry_info *cache_info, | |
96 | boost::optional<obj_version> refresh_version) | |
97 | { | |
98 | rgw_pool pool; | |
99 | string oid; | |
100 | if (ofs != 0) { | |
101 | return RGWSI_SysObj_Core::read(obj_ctx, read_state, objv_tracker, | |
102 | obj, obl, ofs, end, attrs, raw_attrs, | |
103 | cache_info, refresh_version); | |
104 | } | |
105 | ||
106 | normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); | |
107 | string name = normal_name(pool, oid); | |
108 | ||
109 | ObjectCacheInfo info; | |
110 | ||
111 | uint32_t flags = (end != 0 ? CACHE_FLAG_DATA : 0); | |
112 | if (objv_tracker) | |
113 | flags |= CACHE_FLAG_OBJV; | |
114 | if (attrs) | |
115 | flags |= CACHE_FLAG_XATTRS; | |
116 | ||
117 | if ((cache.get(name, info, flags, cache_info) == 0) && | |
118 | (!refresh_version || !info.version.compare(&(*refresh_version)))) { | |
119 | if (info.status < 0) | |
120 | return info.status; | |
121 | ||
122 | bufferlist& bl = info.data; | |
123 | ||
124 | bufferlist::iterator i = bl.begin(); | |
125 | ||
126 | obl->clear(); | |
127 | ||
128 | i.copy_all(*obl); | |
129 | if (objv_tracker) | |
130 | objv_tracker->read_version = info.version; | |
131 | if (attrs) { | |
132 | if (raw_attrs) { | |
133 | *attrs = info.xattrs; | |
134 | } else { | |
135 | rgw_filter_attrset(info.xattrs, RGW_ATTR_PREFIX, attrs); | |
136 | } | |
137 | } | |
138 | return obl->length(); | |
139 | } | |
140 | ||
141 | map<string, bufferlist> unfiltered_attrset; | |
142 | int r = RGWSI_SysObj_Core::read(obj_ctx, read_state, objv_tracker, | |
143 | obj, obl, ofs, end, | |
144 | (attrs ? &unfiltered_attrset : nullptr), | |
145 | true, /* cache unfiltered attrs */ | |
146 | cache_info, | |
147 | refresh_version); | |
148 | if (r < 0) { | |
149 | if (r == -ENOENT) { // only update ENOENT, we'd rather retry other errors | |
150 | info.status = r; | |
151 | cache.put(name, info, cache_info); | |
152 | } | |
153 | return r; | |
154 | } | |
155 | ||
156 | if (obl->length() == end + 1) { | |
157 | /* in this case, most likely object contains more data, we can't cache it */ | |
158 | flags &= ~CACHE_FLAG_DATA; | |
159 | } else { | |
160 | bufferptr p(r); | |
161 | bufferlist& bl = info.data; | |
162 | bl.clear(); | |
163 | bufferlist::iterator o = obl->begin(); | |
164 | o.copy_all(bl); | |
165 | } | |
166 | ||
167 | info.status = 0; | |
168 | info.flags = flags; | |
169 | if (objv_tracker) { | |
170 | info.version = objv_tracker->read_version; | |
171 | } | |
172 | if (attrs) { | |
173 | info.xattrs = std::move(unfiltered_attrset); | |
174 | if (raw_attrs) { | |
175 | *attrs = info.xattrs; | |
176 | } else { | |
177 | rgw_filter_attrset(info.xattrs, RGW_ATTR_PREFIX, attrs); | |
178 | } | |
179 | } | |
180 | cache.put(name, info, cache_info); | |
181 | return r; | |
182 | } | |
183 | ||
184 | int RGWSI_SysObj_Cache::get_attr(const rgw_raw_obj& obj, | |
185 | const char *attr_name, | |
186 | bufferlist *dest) | |
187 | { | |
188 | rgw_pool pool; | |
189 | string oid; | |
190 | ||
191 | normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); | |
192 | string name = normal_name(pool, oid); | |
193 | ||
194 | ObjectCacheInfo info; | |
195 | ||
196 | uint32_t flags = CACHE_FLAG_XATTRS; | |
197 | ||
198 | if (cache.get(name, info, flags, nullptr) == 0) { | |
199 | if (info.status < 0) | |
200 | return info.status; | |
201 | ||
202 | auto iter = info.xattrs.find(attr_name); | |
203 | if (iter == info.xattrs.end()) { | |
204 | return -ENODATA; | |
205 | } | |
206 | ||
207 | *dest = iter->second; | |
208 | return dest->length(); | |
209 | } | |
210 | /* don't try to cache this one */ | |
211 | return RGWSI_SysObj_Core::get_attr(obj, attr_name, dest); | |
212 | } | |
213 | ||
214 | int RGWSI_SysObj_Cache::set_attrs(const rgw_raw_obj& obj, | |
215 | map<string, bufferlist>& attrs, | |
216 | map<string, bufferlist> *rmattrs, | |
217 | RGWObjVersionTracker *objv_tracker) | |
218 | { | |
219 | rgw_pool pool; | |
220 | string oid; | |
221 | normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); | |
222 | ObjectCacheInfo info; | |
223 | info.xattrs = attrs; | |
224 | if (rmattrs) { | |
225 | info.rm_xattrs = *rmattrs; | |
226 | } | |
227 | info.status = 0; | |
228 | info.flags = CACHE_FLAG_MODIFY_XATTRS; | |
229 | if (objv_tracker) { | |
230 | info.version = objv_tracker->write_version; | |
231 | info.flags |= CACHE_FLAG_OBJV; | |
232 | } | |
233 | int ret = RGWSI_SysObj_Core::set_attrs(obj, attrs, rmattrs, objv_tracker); | |
234 | string name = normal_name(pool, oid); | |
235 | if (ret >= 0) { | |
236 | cache.put(name, info, NULL); | |
237 | int r = distribute_cache(name, obj, info, UPDATE_OBJ); | |
238 | if (r < 0) | |
239 | ldout(cct, 0) << "ERROR: failed to distribute cache for " << obj << dendl; | |
240 | } else { | |
241 | cache.remove(name); | |
242 | } | |
243 | ||
244 | return ret; | |
245 | } | |
246 | ||
247 | int RGWSI_SysObj_Cache::write(const rgw_raw_obj& obj, | |
248 | real_time *pmtime, | |
249 | map<std::string, bufferlist>& attrs, | |
250 | bool exclusive, | |
251 | const bufferlist& data, | |
252 | RGWObjVersionTracker *objv_tracker, | |
253 | real_time set_mtime) | |
254 | { | |
255 | rgw_pool pool; | |
256 | string oid; | |
257 | normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); | |
258 | ObjectCacheInfo info; | |
259 | info.xattrs = attrs; | |
260 | info.status = 0; | |
261 | info.data = data; | |
262 | info.flags = CACHE_FLAG_XATTRS | CACHE_FLAG_DATA | CACHE_FLAG_META; | |
263 | if (objv_tracker) { | |
264 | info.version = objv_tracker->write_version; | |
265 | info.flags |= CACHE_FLAG_OBJV; | |
266 | } | |
267 | ceph::real_time result_mtime; | |
268 | int ret = RGWSI_SysObj_Core::write(obj, &result_mtime, attrs, | |
269 | exclusive, data, | |
270 | objv_tracker, set_mtime); | |
271 | if (pmtime) { | |
272 | *pmtime = result_mtime; | |
273 | } | |
274 | info.meta.mtime = result_mtime; | |
275 | info.meta.size = data.length(); | |
276 | string name = normal_name(pool, oid); | |
277 | if (ret >= 0) { | |
278 | cache.put(name, info, NULL); | |
279 | // Only distribute the cache information if we did not just create | |
280 | // the object with the exclusive flag. Note: PUT_OBJ_EXCL implies | |
281 | // PUT_OBJ_CREATE. Generally speaking, when successfully creating | |
282 | // a system object with the exclusive flag it is not necessary to | |
283 | // call distribute_cache, as a) it's unclear whether other RGWs | |
284 | // will need that system object in the near-term and b) it | |
285 | // generates additional network traffic. | |
286 | if (!exclusive) { | |
287 | int r = distribute_cache(name, obj, info, UPDATE_OBJ); | |
288 | if (r < 0) | |
289 | ldout(cct, 0) << "ERROR: failed to distribute cache for " << obj << dendl; | |
290 | } | |
291 | } else { | |
292 | cache.remove(name); | |
293 | } | |
294 | ||
295 | return ret; | |
296 | } | |
297 | ||
298 | int RGWSI_SysObj_Cache::write_data(const rgw_raw_obj& obj, | |
299 | const bufferlist& data, | |
300 | bool exclusive, | |
301 | RGWObjVersionTracker *objv_tracker) | |
302 | { | |
303 | rgw_pool pool; | |
304 | string oid; | |
305 | normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); | |
306 | ||
307 | ObjectCacheInfo info; | |
308 | info.data = data; | |
309 | info.meta.size = data.length(); | |
310 | info.status = 0; | |
311 | info.flags = CACHE_FLAG_DATA; | |
312 | ||
313 | if (objv_tracker) { | |
314 | info.version = objv_tracker->write_version; | |
315 | info.flags |= CACHE_FLAG_OBJV; | |
316 | } | |
317 | int ret = RGWSI_SysObj_Core::write_data(obj, data, exclusive, objv_tracker); | |
318 | string name = normal_name(pool, oid); | |
319 | if (ret >= 0) { | |
320 | cache.put(name, info, NULL); | |
321 | int r = distribute_cache(name, obj, info, UPDATE_OBJ); | |
322 | if (r < 0) | |
323 | ldout(cct, 0) << "ERROR: failed to distribute cache for " << obj << dendl; | |
324 | } else { | |
325 | cache.remove(name); | |
326 | } | |
327 | ||
328 | return ret; | |
329 | } | |
330 | ||
331 | int RGWSI_SysObj_Cache::raw_stat(const rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime, uint64_t *pepoch, | |
332 | map<string, bufferlist> *attrs, bufferlist *first_chunk, | |
333 | RGWObjVersionTracker *objv_tracker) | |
334 | { | |
335 | rgw_pool pool; | |
336 | string oid; | |
337 | normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); | |
338 | ||
339 | string name = normal_name(pool, oid); | |
340 | ||
341 | uint64_t size; | |
342 | real_time mtime; | |
343 | uint64_t epoch; | |
344 | ||
345 | ObjectCacheInfo info; | |
346 | uint32_t flags = CACHE_FLAG_META | CACHE_FLAG_XATTRS; | |
347 | if (objv_tracker) | |
348 | flags |= CACHE_FLAG_OBJV; | |
349 | int r = cache.get(name, info, flags, NULL); | |
350 | if (r == 0) { | |
351 | if (info.status < 0) | |
352 | return info.status; | |
353 | ||
354 | size = info.meta.size; | |
355 | mtime = info.meta.mtime; | |
356 | epoch = info.epoch; | |
357 | if (objv_tracker) | |
358 | objv_tracker->read_version = info.version; | |
359 | goto done; | |
360 | } | |
361 | r = RGWSI_SysObj_Core::raw_stat(obj, &size, &mtime, &epoch, &info.xattrs, first_chunk, objv_tracker); | |
362 | if (r < 0) { | |
363 | if (r == -ENOENT) { | |
364 | info.status = r; | |
365 | cache.put(name, info, NULL); | |
366 | } | |
367 | return r; | |
368 | } | |
369 | info.status = 0; | |
370 | info.epoch = epoch; | |
371 | info.meta.mtime = mtime; | |
372 | info.meta.size = size; | |
373 | info.flags = CACHE_FLAG_META | CACHE_FLAG_XATTRS; | |
374 | if (objv_tracker) { | |
375 | info.flags |= CACHE_FLAG_OBJV; | |
376 | info.version = objv_tracker->read_version; | |
377 | } | |
378 | cache.put(name, info, NULL); | |
379 | done: | |
380 | if (psize) | |
381 | *psize = size; | |
382 | if (pmtime) | |
383 | *pmtime = mtime; | |
384 | if (pepoch) | |
385 | *pepoch = epoch; | |
386 | if (attrs) | |
387 | *attrs = info.xattrs; | |
388 | return 0; | |
389 | } | |
390 | ||
391 | int RGWSI_SysObj_Cache::distribute_cache(const string& normal_name, const rgw_raw_obj& obj, ObjectCacheInfo& obj_info, int op) | |
392 | { | |
393 | RGWCacheNotifyInfo info; | |
394 | ||
395 | info.op = op; | |
396 | ||
397 | info.obj_info = obj_info; | |
398 | info.obj = obj; | |
399 | bufferlist bl; | |
400 | encode(info, bl); | |
401 | return notify_svc->distribute(normal_name, bl); | |
402 | } | |
403 | ||
404 | int RGWSI_SysObj_Cache::watch_cb(uint64_t notify_id, | |
405 | uint64_t cookie, | |
406 | uint64_t notifier_id, | |
407 | bufferlist& bl) | |
408 | { | |
409 | RGWCacheNotifyInfo info; | |
410 | ||
411 | try { | |
412 | auto iter = bl.cbegin(); | |
413 | decode(info, iter); | |
414 | } catch (buffer::end_of_buffer& err) { | |
415 | ldout(cct, 0) << "ERROR: got bad notification" << dendl; | |
416 | return -EIO; | |
417 | } catch (buffer::error& err) { | |
418 | ldout(cct, 0) << "ERROR: buffer::error" << dendl; | |
419 | return -EIO; | |
420 | } | |
421 | ||
422 | rgw_pool pool; | |
423 | string oid; | |
424 | normalize_pool_and_obj(info.obj.pool, info.obj.oid, pool, oid); | |
425 | string name = normal_name(pool, oid); | |
426 | ||
427 | switch (info.op) { | |
428 | case UPDATE_OBJ: | |
429 | cache.put(name, info.obj_info, NULL); | |
430 | break; | |
431 | case REMOVE_OBJ: | |
432 | cache.remove(name); | |
433 | break; | |
434 | default: | |
435 | ldout(cct, 0) << "WARNING: got unknown notification op: " << info.op << dendl; | |
436 | return -EINVAL; | |
437 | } | |
438 | ||
439 | return 0; | |
440 | } | |
441 | ||
442 | void RGWSI_SysObj_Cache::set_enabled(bool status) | |
443 | { | |
444 | cache.set_enabled(status); | |
445 | } | |
446 | ||
447 | bool RGWSI_SysObj_Cache::chain_cache_entry(std::initializer_list<rgw_cache_entry_info *> cache_info_entries, | |
448 | RGWChainedCache::Entry *chained_entry) | |
449 | { | |
450 | return cache.chain_cache_entry(cache_info_entries, chained_entry); | |
451 | } | |
452 | ||
453 | void RGWSI_SysObj_Cache::register_chained_cache(RGWChainedCache *cc) | |
454 | { | |
455 | cache.chain_cache(cc); | |
456 | } | |
457 | ||
458 | void RGWSI_SysObj_Cache::unregister_chained_cache(RGWChainedCache *cc) | |
459 | { | |
460 | cache.unchain_cache(cc); | |
461 | } | |
462 | ||
463 | static void cache_list_dump_helper(Formatter* f, | |
464 | const std::string& name, | |
465 | const ceph::real_time mtime, | |
466 | const std::uint64_t size) | |
467 | { | |
468 | f->dump_string("name", name); | |
469 | f->dump_string("mtime", ceph::to_iso_8601(mtime)); | |
470 | f->dump_unsigned("size", size); | |
471 | } | |
472 | ||
473 | void RGWSI_SysObj_Cache::call_list(const std::optional<std::string>& filter, Formatter* f) | |
474 | { | |
475 | cache.for_each( | |
476 | [this, &filter, f] (const string& name, const ObjectCacheEntry& entry) { | |
477 | if (!filter || name.find(*filter) != name.npos) { | |
478 | cache_list_dump_helper(f, name, entry.info.meta.mtime, | |
479 | entry.info.meta.size); | |
480 | } | |
481 | }); | |
482 | } | |
483 | ||
484 | int RGWSI_SysObj_Cache::call_inspect(const std::string& target, Formatter* f) | |
485 | { | |
486 | if (const auto entry = cache.get(target)) { | |
487 | f->open_object_section("cache_entry"); | |
488 | f->dump_string("name", target.c_str()); | |
489 | entry->dump(f); | |
490 | f->close_section(); | |
491 | return true; | |
492 | } else { | |
493 | return false; | |
494 | } | |
495 | } | |
496 | ||
497 | int RGWSI_SysObj_Cache::call_erase(const std::string& target) | |
498 | { | |
499 | return cache.remove(target); | |
500 | } | |
501 | ||
502 | int RGWSI_SysObj_Cache::call_zap() | |
503 | { | |
504 | cache.invalidate_all(); | |
505 | return 0; | |
506 | } |