]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/services/svc_notify.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / rgw / services / svc_notify.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab ft=cpp
3
11fdf7f2 4#include "include/random.h"
9f95a23c 5#include "include/Context.h"
11fdf7f2
TL
6#include "common/errno.h"
7
1e59de90 8#include "rgw_cache.h"
11fdf7f2
TL
9#include "svc_notify.h"
10#include "svc_finisher.h"
11#include "svc_zone.h"
12#include "svc_rados.h"
13
1e59de90 14#include "rgw_zone.h"
11fdf7f2
TL
15
16#define dout_subsys ceph_subsys_rgw
17
20effc67
TL
18using namespace std;
19
11fdf7f2
TL
20static string notify_oid_prefix = "notify";
21
20effc67
TL
22RGWSI_Notify::~RGWSI_Notify()
23{
24 shutdown();
25}
26
27
b3b6e05e 28class RGWWatcher : public DoutPrefixProvider , public librados::WatchCtx2 {
11fdf7f2
TL
29 CephContext *cct;
30 RGWSI_Notify *svc;
31 int index;
32 RGWSI_RADOS::Obj obj;
33 uint64_t watch_handle;
34 int register_ret{0};
1e59de90 35 bool unregister_done{false};
11fdf7f2
TL
36 librados::AioCompletion *register_completion{nullptr};
37
38 class C_ReinitWatch : public Context {
39 RGWWatcher *watcher;
40 public:
41 explicit C_ReinitWatch(RGWWatcher *_watcher) : watcher(_watcher) {}
42 void finish(int r) override {
43 watcher->reinit();
44 }
45 };
b3b6e05e 46
522d829b
TL
47 CephContext *get_cct() const override { return cct; }
48 unsigned get_subsys() const override { return dout_subsys; }
49 std::ostream& gen_prefix(std::ostream& out) const override {
50 return out << "rgw watcher librados: ";
51 }
b3b6e05e 52
11fdf7f2
TL
53public:
54 RGWWatcher(CephContext *_cct, RGWSI_Notify *s, int i, RGWSI_RADOS::Obj& o) : cct(_cct), svc(s), index(i), obj(o), watch_handle(0) {}
55 void handle_notify(uint64_t notify_id,
56 uint64_t cookie,
57 uint64_t notifier_id,
58 bufferlist& bl) override {
b3b6e05e 59 ldpp_dout(this, 10) << "RGWWatcher::handle_notify() "
11fdf7f2
TL
60 << " notify_id " << notify_id
61 << " cookie " << cookie
62 << " notifier " << notifier_id
63 << " bl.length()=" << bl.length() << dendl;
64
65 if (unlikely(svc->inject_notify_timeout_probability == 1) ||
66 (svc->inject_notify_timeout_probability > 0 &&
67 (svc->inject_notify_timeout_probability >
68 ceph::util::generate_random_number(0.0, 1.0)))) {
b3b6e05e 69 ldpp_dout(this, 0)
11fdf7f2
TL
70 << "RGWWatcher::handle_notify() dropping notification! "
71 << "If this isn't what you want, set "
72 << "rgw_inject_notify_timeout_probability to zero!" << dendl;
73 return;
74 }
75
b3b6e05e 76 svc->watch_cb(this, notify_id, cookie, notifier_id, bl);
11fdf7f2
TL
77
78 bufferlist reply_bl; // empty reply payload
79 obj.notify_ack(notify_id, cookie, reply_bl);
80 }
81 void handle_error(uint64_t cookie, int err) override {
20effc67 82 ldpp_dout(this, -1) << "RGWWatcher::handle_error cookie " << cookie
11fdf7f2
TL
83 << " err " << cpp_strerror(err) << dendl;
84 svc->remove_watcher(index);
85 svc->schedule_context(new C_ReinitWatch(this));
86 }
87
88 void reinit() {
1e59de90
TL
89 if(!unregister_done) {
90 int ret = unregister_watch();
91 if (ret < 0) {
92 ldout(cct, 0) << "ERROR: unregister_watch() returned ret=" << ret << dendl;
93 }
11fdf7f2 94 }
1e59de90 95 int ret = register_watch();
11fdf7f2
TL
96 if (ret < 0) {
97 ldout(cct, 0) << "ERROR: register_watch() returned ret=" << ret << dendl;
1e59de90 98 svc->schedule_context(new C_ReinitWatch(this));
11fdf7f2
TL
99 return;
100 }
101 }
102
103 int unregister_watch() {
104 int r = svc->unwatch(obj, watch_handle);
1e59de90 105 unregister_done = true;
11fdf7f2
TL
106 if (r < 0) {
107 return r;
108 }
109 svc->remove_watcher(index);
110 return 0;
111 }
112
113 int register_watch_async() {
114 if (register_completion) {
115 register_completion->release();
116 register_completion = nullptr;
117 }
9f95a23c 118 register_completion = librados::Rados::aio_create_completion(nullptr, nullptr);
11fdf7f2
TL
119 register_ret = obj.aio_watch(register_completion, &watch_handle, this);
120 if (register_ret < 0) {
121 register_completion->release();
122 return register_ret;
123 }
124 return 0;
125 }
126
127 int register_watch_finish() {
128 if (register_ret < 0) {
129 return register_ret;
130 }
131 if (!register_completion) {
132 return -EINVAL;
133 }
9f95a23c 134 register_completion->wait_for_complete();
11fdf7f2
TL
135 int r = register_completion->get_return_value();
136 register_completion->release();
137 register_completion = nullptr;
138 if (r < 0) {
139 return r;
140 }
141 svc->add_watcher(index);
1e59de90 142 unregister_done = false;
11fdf7f2
TL
143 return 0;
144 }
145
146 int register_watch() {
147 int r = obj.watch(&watch_handle, this);
148 if (r < 0) {
149 return r;
150 }
151 svc->add_watcher(index);
1e59de90 152 unregister_done = false;
11fdf7f2
TL
153 return 0;
154 }
155};
156
157
158class RGWSI_Notify_ShutdownCB : public RGWSI_Finisher::ShutdownCB
159{
160 RGWSI_Notify *svc;
161public:
162 RGWSI_Notify_ShutdownCB(RGWSI_Notify *_svc) : svc(_svc) {}
163 void call() override {
164 svc->shutdown();
165 }
166};
167
168string RGWSI_Notify::get_control_oid(int i)
169{
170 char buf[notify_oid_prefix.size() + 16];
171 snprintf(buf, sizeof(buf), "%s.%d", notify_oid_prefix.c_str(), i);
172
173 return string(buf);
174}
175
adb31ebb 176// do not call pick_obj_control before init_watch
11fdf7f2
TL
177RGWSI_RADOS::Obj RGWSI_Notify::pick_control_obj(const string& key)
178{
179 uint32_t r = ceph_str_hash_linux(key.c_str(), key.size());
180
181 int i = r % num_watchers;
182 return notify_objs[i];
183}
184
b3b6e05e 185int RGWSI_Notify::init_watch(const DoutPrefixProvider *dpp, optional_yield y)
11fdf7f2
TL
186{
187 num_watchers = cct->_conf->rgw_num_control_oids;
188
189 bool compat_oid = (num_watchers == 0);
190
191 if (num_watchers <= 0)
192 num_watchers = 1;
193
194 watchers = new RGWWatcher *[num_watchers];
195
196 int error = 0;
197
198 notify_objs.resize(num_watchers);
199
200 for (int i=0; i < num_watchers; i++) {
201 string notify_oid;
202
203 if (!compat_oid) {
204 notify_oid = get_control_oid(i);
205 } else {
206 notify_oid = notify_oid_prefix;
207 }
208
494da23a 209 notify_objs[i] = rados_svc->handle().obj({control_pool, notify_oid});
11fdf7f2
TL
210 auto& notify_obj = notify_objs[i];
211
b3b6e05e 212 int r = notify_obj.open(dpp);
11fdf7f2 213 if (r < 0) {
b3b6e05e 214 ldpp_dout(dpp, 0) << "ERROR: notify_obj.open() returned r=" << r << dendl;
11fdf7f2
TL
215 return r;
216 }
217
218 librados::ObjectWriteOperation op;
219 op.create(false);
b3b6e05e 220 r = notify_obj.operate(dpp, &op, y);
11fdf7f2 221 if (r < 0 && r != -EEXIST) {
b3b6e05e 222 ldpp_dout(dpp, 0) << "ERROR: notify_obj.operate() returned r=" << r << dendl;
11fdf7f2
TL
223 return r;
224 }
225
226 RGWWatcher *watcher = new RGWWatcher(cct, this, i, notify_obj);
227 watchers[i] = watcher;
228
229 r = watcher->register_watch_async();
230 if (r < 0) {
b3b6e05e 231 ldpp_dout(dpp, 0) << "WARNING: register_watch_aio() returned " << r << dendl;
11fdf7f2
TL
232 error = r;
233 continue;
234 }
235 }
236
237 for (int i = 0; i < num_watchers; ++i) {
238 int r = watchers[i]->register_watch_finish();
239 if (r < 0) {
b3b6e05e 240 ldpp_dout(dpp, 0) << "WARNING: async watch returned " << r << dendl;
11fdf7f2
TL
241 error = r;
242 }
243 }
244
245 if (error < 0) {
246 return error;
247 }
248
249 return 0;
250}
251
252void RGWSI_Notify::finalize_watch()
253{
254 for (int i = 0; i < num_watchers; i++) {
255 RGWWatcher *watcher = watchers[i];
aee94f69
TL
256 if (watchers_set.find(i) != watchers_set.end())
257 watcher->unregister_watch();
11fdf7f2
TL
258 delete watcher;
259 }
260
261 delete[] watchers;
262}
263
b3b6e05e 264int RGWSI_Notify::do_start(optional_yield y, const DoutPrefixProvider *dpp)
11fdf7f2 265{
b3b6e05e 266 int r = zone_svc->start(y, dpp);
11fdf7f2
TL
267 if (r < 0) {
268 return r;
269 }
270
271 assert(zone_svc->is_started()); /* otherwise there's an ordering problem */
272
b3b6e05e 273 r = rados_svc->start(y, dpp);
11fdf7f2
TL
274 if (r < 0) {
275 return r;
276 }
b3b6e05e 277 r = finisher_svc->start(y, dpp);
11fdf7f2
TL
278 if (r < 0) {
279 return r;
280 }
281
aee94f69
TL
282 inject_notify_timeout_probability =
283 cct->_conf.get_val<double>("rgw_inject_notify_timeout_probability");
284 max_notify_retries = cct->_conf.get_val<uint64_t>("rgw_max_notify_retries");
285
11fdf7f2
TL
286 control_pool = zone_svc->get_zone_params().control_pool;
287
b3b6e05e 288 int ret = init_watch(dpp, y);
11fdf7f2 289 if (ret < 0) {
20effc67 290 ldpp_dout(dpp, -1) << "ERROR: failed to initialize watch: " << cpp_strerror(-ret) << dendl;
11fdf7f2
TL
291 return ret;
292 }
293
294 shutdown_cb = new RGWSI_Notify_ShutdownCB(this);
295 int handle;
296 finisher_svc->register_caller(shutdown_cb, &handle);
297 finisher_handle = handle;
298
299 return 0;
300}
301
302void RGWSI_Notify::shutdown()
303{
304 if (finalized) {
305 return;
306 }
307
308 if (finisher_handle) {
309 finisher_svc->unregister_caller(*finisher_handle);
310 }
311 finalize_watch();
312
313 delete shutdown_cb;
314
315 finalized = true;
316}
317
11fdf7f2
TL
318int RGWSI_Notify::unwatch(RGWSI_RADOS::Obj& obj, uint64_t watch_handle)
319{
320 int r = obj.unwatch(watch_handle);
321 if (r < 0) {
322 ldout(cct, 0) << "ERROR: rados->unwatch2() returned r=" << r << dendl;
323 return r;
324 }
494da23a 325 r = rados_svc->handle().watch_flush();
11fdf7f2
TL
326 if (r < 0) {
327 ldout(cct, 0) << "ERROR: rados->watch_flush() returned r=" << r << dendl;
328 return r;
329 }
330 return 0;
331}
332
333void RGWSI_Notify::add_watcher(int i)
334{
335 ldout(cct, 20) << "add_watcher() i=" << i << dendl;
9f95a23c 336 std::unique_lock l{watchers_lock};
11fdf7f2
TL
337 watchers_set.insert(i);
338 if (watchers_set.size() == (size_t)num_watchers) {
339 ldout(cct, 2) << "all " << num_watchers << " watchers are set, enabling cache" << dendl;
340 _set_enabled(true);
341 }
342}
343
344void RGWSI_Notify::remove_watcher(int i)
345{
346 ldout(cct, 20) << "remove_watcher() i=" << i << dendl;
9f95a23c 347 std::unique_lock l{watchers_lock};
11fdf7f2
TL
348 size_t orig_size = watchers_set.size();
349 watchers_set.erase(i);
350 if (orig_size == (size_t)num_watchers &&
351 watchers_set.size() < orig_size) { /* actually removed */
352 ldout(cct, 2) << "removed watcher, disabling cache" << dendl;
353 _set_enabled(false);
354 }
355}
356
b3b6e05e
TL
357int RGWSI_Notify::watch_cb(const DoutPrefixProvider *dpp,
358 uint64_t notify_id,
11fdf7f2
TL
359 uint64_t cookie,
360 uint64_t notifier_id,
361 bufferlist& bl)
362{
9f95a23c 363 std::shared_lock l{watchers_lock};
11fdf7f2 364 if (cb) {
b3b6e05e 365 return cb->watch_cb(dpp, notify_id, cookie, notifier_id, bl);
11fdf7f2
TL
366 }
367 return 0;
368}
369
370void RGWSI_Notify::set_enabled(bool status)
371{
9f95a23c 372 std::unique_lock l{watchers_lock};
11fdf7f2
TL
373 _set_enabled(status);
374}
375
376void RGWSI_Notify::_set_enabled(bool status)
377{
378 enabled = status;
379 if (cb) {
380 cb->set_enabled(status);
381 }
382}
383
522d829b
TL
384int RGWSI_Notify::distribute(const DoutPrefixProvider *dpp, const string& key,
385 const RGWCacheNotifyInfo& cni,
9f95a23c 386 optional_yield y)
11fdf7f2 387{
adb31ebb
TL
388 /* The RGW uses the control pool to store the watch notify objects.
389 The precedence in RGWSI_Notify::do_start is to call to zone_svc->start and later to init_watch().
390 The first time, RGW starts in the cluster, the RGW will try to create zone and zonegroup system object.
391 In that case RGW will try to distribute the cache before it ran init_watch,
392 which will lead to division by 0 in pick_obj_control (num_watchers is 0).
393 */
394 if (num_watchers > 0) {
395 RGWSI_RADOS::Obj notify_obj = pick_control_obj(key);
396
b3b6e05e 397 ldpp_dout(dpp, 10) << "distributing notification oid=" << notify_obj.get_ref().obj
522d829b
TL
398 << " cni=" << cni << dendl;
399 return robust_notify(dpp, notify_obj, cni, y);
adb31ebb
TL
400 }
401 return 0;
11fdf7f2
TL
402}
403
aee94f69
TL
404namespace librados {
405
406static std::ostream& operator<<(std::ostream& out, const notify_timeout_t& t)
407{
408 return out << t.notifier_id << ':' << t.cookie;
409}
410
411} // namespace librados
412
413using timeout_vector = std::vector<librados::notify_timeout_t>;
414
415static timeout_vector decode_timeouts(const bufferlist& bl)
416{
417 using ceph::decode;
418 auto p = bl.begin();
419
420 // decode and discard the acks
421 uint32_t num_acks;
422 decode(num_acks, p);
423 for (auto i = 0u; i < num_acks; ++i) {
424 std::pair<uint64_t, uint64_t> id;
425 decode(id, p);
426 // discard the payload
427 uint32_t blen;
428 decode(blen, p);
429 p += blen;
430 }
431
432 // decode and return the timeouts
433 uint32_t num_timeouts;
434 decode(num_timeouts, p);
435
436 timeout_vector timeouts;
437 for (auto i = 0u; i < num_timeouts; ++i) {
438 std::pair<uint64_t, uint64_t> id;
439 decode(id, p);
440 timeouts.push_back({id.first, id.second});
441 }
442 return timeouts;
443}
444
522d829b
TL
445int RGWSI_Notify::robust_notify(const DoutPrefixProvider *dpp,
446 RGWSI_RADOS::Obj& notify_obj,
447 const RGWCacheNotifyInfo& cni,
9f95a23c 448 optional_yield y)
11fdf7f2 449{
aee94f69 450 bufferlist bl, rbl;
522d829b 451 encode(cni, bl);
11fdf7f2
TL
452
453 // First, try to send, without being fancy about it.
aee94f69 454 auto r = notify_obj.notify(dpp, bl, 0, &rbl, y);
11fdf7f2 455
11fdf7f2 456 if (r < 0) {
aee94f69
TL
457 timeout_vector timeouts;
458 try {
459 timeouts = decode_timeouts(rbl);
460 } catch (const buffer::error& e) {
461 ldpp_dout(dpp, 0) << "robust_notify failed to decode notify response: "
462 << e.what() << dendl;
463 }
464
522d829b 465 ldpp_dout(dpp, 1) << __PRETTY_FUNCTION__ << ":" << __LINE__
aee94f69 466 << " Watchers " << timeouts << " did not respond."
522d829b
TL
467 << " Notify failed on object " << cni.obj << ": "
468 << cpp_strerror(-r) << dendl;
469 }
11fdf7f2 470
522d829b
TL
471 // If we timed out, get serious.
472 if (r == -ETIMEDOUT) {
473 RGWCacheNotifyInfo info;
20effc67 474 info.op = INVALIDATE_OBJ;
522d829b
TL
475 info.obj = cni.obj;
476 bufferlist retrybl;
477 encode(info, retrybl);
478
479 for (auto tries = 0u;
480 r == -ETIMEDOUT && tries < max_notify_retries;
481 ++tries) {
482 ldpp_dout(dpp, 1) << __PRETTY_FUNCTION__ << ":" << __LINE__
483 << " Invalidating obj=" << info.obj << " tries="
484 << tries << dendl;
aee94f69 485 r = notify_obj.notify(dpp, retrybl, 0, &rbl, y);
11fdf7f2 486 if (r < 0) {
aee94f69
TL
487 timeout_vector timeouts;
488 try {
489 timeouts = decode_timeouts(rbl);
490 } catch (const buffer::error& e) {
491 ldpp_dout(dpp, 0) << "robust_notify failed to decode notify response: "
492 << e.what() << dendl;
493 }
494
522d829b 495 ldpp_dout(dpp, 1) << __PRETTY_FUNCTION__ << ":" << __LINE__
aee94f69
TL
496 << " Watchers " << timeouts << " did not respond."
497 << " Invalidation attempt " << tries << " failed: "
522d829b 498 << cpp_strerror(-r) << dendl;
11fdf7f2
TL
499 }
500 }
501 }
502 return r;
503}
504
505void RGWSI_Notify::register_watch_cb(CB *_cb)
506{
9f95a23c 507 std::unique_lock l{watchers_lock};
11fdf7f2
TL
508 cb = _cb;
509 _set_enabled(enabled);
510}
511
512void RGWSI_Notify::schedule_context(Context *c)
513{
514 finisher_svc->schedule_context(c);
515}