]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/services/svc_notify.cc
bump version to 15.2.13-pve1
[ceph.git] / ceph / src / rgw / services / svc_notify.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab ft=cpp
3
11fdf7f2 4#include "include/random.h"
9f95a23c 5#include "include/Context.h"
11fdf7f2
TL
6#include "common/errno.h"
7
8#include "svc_notify.h"
9#include "svc_finisher.h"
10#include "svc_zone.h"
11#include "svc_rados.h"
12
13#include "rgw/rgw_zone.h"
14
15#define dout_subsys ceph_subsys_rgw
16
17static string notify_oid_prefix = "notify";
18
19class RGWWatcher : public librados::WatchCtx2 {
20 CephContext *cct;
21 RGWSI_Notify *svc;
22 int index;
23 RGWSI_RADOS::Obj obj;
24 uint64_t watch_handle;
25 int register_ret{0};
26 librados::AioCompletion *register_completion{nullptr};
27
28 class C_ReinitWatch : public Context {
29 RGWWatcher *watcher;
30 public:
31 explicit C_ReinitWatch(RGWWatcher *_watcher) : watcher(_watcher) {}
32 void finish(int r) override {
33 watcher->reinit();
34 }
35 };
36public:
37 RGWWatcher(CephContext *_cct, RGWSI_Notify *s, int i, RGWSI_RADOS::Obj& o) : cct(_cct), svc(s), index(i), obj(o), watch_handle(0) {}
38 void handle_notify(uint64_t notify_id,
39 uint64_t cookie,
40 uint64_t notifier_id,
41 bufferlist& bl) override {
42 ldout(cct, 10) << "RGWWatcher::handle_notify() "
43 << " notify_id " << notify_id
44 << " cookie " << cookie
45 << " notifier " << notifier_id
46 << " bl.length()=" << bl.length() << dendl;
47
48 if (unlikely(svc->inject_notify_timeout_probability == 1) ||
49 (svc->inject_notify_timeout_probability > 0 &&
50 (svc->inject_notify_timeout_probability >
51 ceph::util::generate_random_number(0.0, 1.0)))) {
52 ldout(cct, 0)
53 << "RGWWatcher::handle_notify() dropping notification! "
54 << "If this isn't what you want, set "
55 << "rgw_inject_notify_timeout_probability to zero!" << dendl;
56 return;
57 }
58
59 svc->watch_cb(notify_id, cookie, notifier_id, bl);
60
61 bufferlist reply_bl; // empty reply payload
62 obj.notify_ack(notify_id, cookie, reply_bl);
63 }
64 void handle_error(uint64_t cookie, int err) override {
65 lderr(cct) << "RGWWatcher::handle_error cookie " << cookie
66 << " err " << cpp_strerror(err) << dendl;
67 svc->remove_watcher(index);
68 svc->schedule_context(new C_ReinitWatch(this));
69 }
70
71 void reinit() {
72 int ret = unregister_watch();
73 if (ret < 0) {
74 ldout(cct, 0) << "ERROR: unregister_watch() returned ret=" << ret << dendl;
75 return;
76 }
77 ret = register_watch();
78 if (ret < 0) {
79 ldout(cct, 0) << "ERROR: register_watch() returned ret=" << ret << dendl;
80 return;
81 }
82 }
83
84 int unregister_watch() {
85 int r = svc->unwatch(obj, watch_handle);
86 if (r < 0) {
87 return r;
88 }
89 svc->remove_watcher(index);
90 return 0;
91 }
92
93 int register_watch_async() {
94 if (register_completion) {
95 register_completion->release();
96 register_completion = nullptr;
97 }
9f95a23c 98 register_completion = librados::Rados::aio_create_completion(nullptr, nullptr);
11fdf7f2
TL
99 register_ret = obj.aio_watch(register_completion, &watch_handle, this);
100 if (register_ret < 0) {
101 register_completion->release();
102 return register_ret;
103 }
104 return 0;
105 }
106
107 int register_watch_finish() {
108 if (register_ret < 0) {
109 return register_ret;
110 }
111 if (!register_completion) {
112 return -EINVAL;
113 }
9f95a23c 114 register_completion->wait_for_complete();
11fdf7f2
TL
115 int r = register_completion->get_return_value();
116 register_completion->release();
117 register_completion = nullptr;
118 if (r < 0) {
119 return r;
120 }
121 svc->add_watcher(index);
122 return 0;
123 }
124
125 int register_watch() {
126 int r = obj.watch(&watch_handle, this);
127 if (r < 0) {
128 return r;
129 }
130 svc->add_watcher(index);
131 return 0;
132 }
133};
134
135
136class RGWSI_Notify_ShutdownCB : public RGWSI_Finisher::ShutdownCB
137{
138 RGWSI_Notify *svc;
139public:
140 RGWSI_Notify_ShutdownCB(RGWSI_Notify *_svc) : svc(_svc) {}
141 void call() override {
142 svc->shutdown();
143 }
144};
145
146string RGWSI_Notify::get_control_oid(int i)
147{
148 char buf[notify_oid_prefix.size() + 16];
149 snprintf(buf, sizeof(buf), "%s.%d", notify_oid_prefix.c_str(), i);
150
151 return string(buf);
152}
153
adb31ebb 154// do not call pick_obj_control before init_watch
11fdf7f2
TL
155RGWSI_RADOS::Obj RGWSI_Notify::pick_control_obj(const string& key)
156{
157 uint32_t r = ceph_str_hash_linux(key.c_str(), key.size());
158
159 int i = r % num_watchers;
160 return notify_objs[i];
161}
162
163int RGWSI_Notify::init_watch()
164{
165 num_watchers = cct->_conf->rgw_num_control_oids;
166
167 bool compat_oid = (num_watchers == 0);
168
169 if (num_watchers <= 0)
170 num_watchers = 1;
171
172 watchers = new RGWWatcher *[num_watchers];
173
174 int error = 0;
175
176 notify_objs.resize(num_watchers);
177
178 for (int i=0; i < num_watchers; i++) {
179 string notify_oid;
180
181 if (!compat_oid) {
182 notify_oid = get_control_oid(i);
183 } else {
184 notify_oid = notify_oid_prefix;
185 }
186
494da23a 187 notify_objs[i] = rados_svc->handle().obj({control_pool, notify_oid});
11fdf7f2
TL
188 auto& notify_obj = notify_objs[i];
189
190 int r = notify_obj.open();
191 if (r < 0) {
192 ldout(cct, 0) << "ERROR: notify_obj.open() returned r=" << r << dendl;
193 return r;
194 }
195
196 librados::ObjectWriteOperation op;
197 op.create(false);
198 r = notify_obj.operate(&op, null_yield);
199 if (r < 0 && r != -EEXIST) {
200 ldout(cct, 0) << "ERROR: notify_obj.operate() returned r=" << r << dendl;
201 return r;
202 }
203
204 RGWWatcher *watcher = new RGWWatcher(cct, this, i, notify_obj);
205 watchers[i] = watcher;
206
207 r = watcher->register_watch_async();
208 if (r < 0) {
209 ldout(cct, 0) << "WARNING: register_watch_aio() returned " << r << dendl;
210 error = r;
211 continue;
212 }
213 }
214
215 for (int i = 0; i < num_watchers; ++i) {
216 int r = watchers[i]->register_watch_finish();
217 if (r < 0) {
218 ldout(cct, 0) << "WARNING: async watch returned " << r << dendl;
219 error = r;
220 }
221 }
222
223 if (error < 0) {
224 return error;
225 }
226
227 return 0;
228}
229
230void RGWSI_Notify::finalize_watch()
231{
232 for (int i = 0; i < num_watchers; i++) {
233 RGWWatcher *watcher = watchers[i];
234 watcher->unregister_watch();
235 delete watcher;
236 }
237
238 delete[] watchers;
239}
240
241int RGWSI_Notify::do_start()
242{
243 int r = zone_svc->start();
244 if (r < 0) {
245 return r;
246 }
247
248 assert(zone_svc->is_started()); /* otherwise there's an ordering problem */
249
250 r = rados_svc->start();
251 if (r < 0) {
252 return r;
253 }
254 r = finisher_svc->start();
255 if (r < 0) {
256 return r;
257 }
258
259 control_pool = zone_svc->get_zone_params().control_pool;
260
261 int ret = init_watch();
262 if (ret < 0) {
263 lderr(cct) << "ERROR: failed to initialize watch: " << cpp_strerror(-ret) << dendl;
264 return ret;
265 }
266
267 shutdown_cb = new RGWSI_Notify_ShutdownCB(this);
268 int handle;
269 finisher_svc->register_caller(shutdown_cb, &handle);
270 finisher_handle = handle;
271
272 return 0;
273}
274
275void RGWSI_Notify::shutdown()
276{
277 if (finalized) {
278 return;
279 }
280
281 if (finisher_handle) {
282 finisher_svc->unregister_caller(*finisher_handle);
283 }
284 finalize_watch();
285
286 delete shutdown_cb;
287
288 finalized = true;
289}
290
291RGWSI_Notify::~RGWSI_Notify()
292{
293 shutdown();
294}
295
296int RGWSI_Notify::unwatch(RGWSI_RADOS::Obj& obj, uint64_t watch_handle)
297{
298 int r = obj.unwatch(watch_handle);
299 if (r < 0) {
300 ldout(cct, 0) << "ERROR: rados->unwatch2() returned r=" << r << dendl;
301 return r;
302 }
494da23a 303 r = rados_svc->handle().watch_flush();
11fdf7f2
TL
304 if (r < 0) {
305 ldout(cct, 0) << "ERROR: rados->watch_flush() returned r=" << r << dendl;
306 return r;
307 }
308 return 0;
309}
310
311void RGWSI_Notify::add_watcher(int i)
312{
313 ldout(cct, 20) << "add_watcher() i=" << i << dendl;
9f95a23c 314 std::unique_lock l{watchers_lock};
11fdf7f2
TL
315 watchers_set.insert(i);
316 if (watchers_set.size() == (size_t)num_watchers) {
317 ldout(cct, 2) << "all " << num_watchers << " watchers are set, enabling cache" << dendl;
318 _set_enabled(true);
319 }
320}
321
322void RGWSI_Notify::remove_watcher(int i)
323{
324 ldout(cct, 20) << "remove_watcher() i=" << i << dendl;
9f95a23c 325 std::unique_lock l{watchers_lock};
11fdf7f2
TL
326 size_t orig_size = watchers_set.size();
327 watchers_set.erase(i);
328 if (orig_size == (size_t)num_watchers &&
329 watchers_set.size() < orig_size) { /* actually removed */
330 ldout(cct, 2) << "removed watcher, disabling cache" << dendl;
331 _set_enabled(false);
332 }
333}
334
335int RGWSI_Notify::watch_cb(uint64_t notify_id,
336 uint64_t cookie,
337 uint64_t notifier_id,
338 bufferlist& bl)
339{
9f95a23c 340 std::shared_lock l{watchers_lock};
11fdf7f2
TL
341 if (cb) {
342 return cb->watch_cb(notify_id, cookie, notifier_id, bl);
343 }
344 return 0;
345}
346
347void RGWSI_Notify::set_enabled(bool status)
348{
9f95a23c 349 std::unique_lock l{watchers_lock};
11fdf7f2
TL
350 _set_enabled(status);
351}
352
353void RGWSI_Notify::_set_enabled(bool status)
354{
355 enabled = status;
356 if (cb) {
357 cb->set_enabled(status);
358 }
359}
360
9f95a23c
TL
361int RGWSI_Notify::distribute(const string& key, bufferlist& bl,
362 optional_yield y)
11fdf7f2 363{
adb31ebb
TL
364 /* The RGW uses the control pool to store the watch notify objects.
365 The precedence in RGWSI_Notify::do_start is to call to zone_svc->start and later to init_watch().
366 The first time, RGW starts in the cluster, the RGW will try to create zone and zonegroup system object.
367 In that case RGW will try to distribute the cache before it ran init_watch,
368 which will lead to division by 0 in pick_obj_control (num_watchers is 0).
369 */
370 if (num_watchers > 0) {
371 RGWSI_RADOS::Obj notify_obj = pick_control_obj(key);
372
373 ldout(cct, 10) << "distributing notification oid=" << notify_obj.get_ref().obj
374 << " bl.length()=" << bl.length() << dendl;
375 return robust_notify(notify_obj, bl, y);
376 }
377 return 0;
11fdf7f2
TL
378}
379
9f95a23c
TL
380int RGWSI_Notify::robust_notify(RGWSI_RADOS::Obj& notify_obj, bufferlist& bl,
381 optional_yield y)
11fdf7f2
TL
382{
383 // The reply of every machine that acks goes in here.
384 boost::container::flat_set<std::pair<uint64_t, uint64_t>> acks;
385 bufferlist rbl;
386
387 // First, try to send, without being fancy about it.
9f95a23c 388 auto r = notify_obj.notify(bl, 0, &rbl, y);
11fdf7f2
TL
389
390 // If that doesn't work, get serious.
391 if (r < 0) {
392 ldout(cct, 1) << "robust_notify: If at first you don't succeed: "
393 << cpp_strerror(-r) << dendl;
394
395
396 auto p = rbl.cbegin();
397 // Gather up the replies to the first attempt.
398 try {
399 uint32_t num_acks;
400 decode(num_acks, p);
401 // Doing this ourselves since we don't care about the payload;
402 for (auto i = 0u; i < num_acks; ++i) {
403 std::pair<uint64_t, uint64_t> id;
404 decode(id, p);
405 acks.insert(id);
406 ldout(cct, 20) << "robust_notify: acked by " << id << dendl;
407 uint32_t blen;
408 decode(blen, p);
9f95a23c 409 p += blen;
11fdf7f2
TL
410 }
411 } catch (const buffer::error& e) {
412 ldout(cct, 0) << "robust_notify: notify response parse failed: "
413 << e.what() << dendl;
414 acks.clear(); // Throw away junk on failed parse.
415 }
416
417
418 // Every machine that fails to reply and hasn't acked a previous
419 // attempt goes in here.
420 boost::container::flat_set<std::pair<uint64_t, uint64_t>> timeouts;
421
422 auto tries = 1u;
423 while (r < 0 && tries < max_notify_retries) {
424 ++tries;
425 rbl.clear();
426 // Reset the timeouts, we're only concerned with new ones.
427 timeouts.clear();
9f95a23c 428 r = notify_obj.notify(bl, 0, &rbl, y);
11fdf7f2
TL
429 if (r < 0) {
430 ldout(cct, 1) << "robust_notify: retry " << tries << " failed: "
431 << cpp_strerror(-r) << dendl;
432 p = rbl.begin();
433 try {
434 uint32_t num_acks;
435 decode(num_acks, p);
436 // Not only do we not care about the payload, but we don't
437 // want to empty the container; we just want to augment it
438 // with any new members.
439 for (auto i = 0u; i < num_acks; ++i) {
440 std::pair<uint64_t, uint64_t> id;
441 decode(id, p);
442 auto ir = acks.insert(id);
443 if (ir.second) {
444 ldout(cct, 20) << "robust_notify: acked by " << id << dendl;
445 }
446 uint32_t blen;
447 decode(blen, p);
9f95a23c 448 p += blen;
11fdf7f2
TL
449 }
450
451 uint32_t num_timeouts;
452 decode(num_timeouts, p);
453 for (auto i = 0u; i < num_timeouts; ++i) {
454 std::pair<uint64_t, uint64_t> id;
455 decode(id, p);
456 // Only track timeouts from hosts that haven't acked previously.
457 if (acks.find(id) != acks.cend()) {
458 ldout(cct, 20) << "robust_notify: " << id << " timed out."
459 << dendl;
460 timeouts.insert(id);
461 }
462 }
463 } catch (const buffer::error& e) {
464 ldout(cct, 0) << "robust_notify: notify response parse failed: "
465 << e.what() << dendl;
466 continue;
467 }
468 // If we got a good parse and timeouts is empty, that means
469 // everyone who timed out in one call received the update in a
470 // previous one.
471 if (timeouts.empty()) {
472 r = 0;
473 }
474 }
475 }
476 }
477 return r;
478}
479
480void RGWSI_Notify::register_watch_cb(CB *_cb)
481{
9f95a23c 482 std::unique_lock l{watchers_lock};
11fdf7f2
TL
483 cb = _cb;
484 _set_enabled(enabled);
485}
486
487void RGWSI_Notify::schedule_context(Context *c)
488{
489 finisher_svc->schedule_context(c);
490}