]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/services/svc_notify.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rgw / services / svc_notify.cc
CommitLineData
11fdf7f2
TL
1#include "include/random.h"
2#include "common/errno.h"
3
4#include "svc_notify.h"
5#include "svc_finisher.h"
6#include "svc_zone.h"
7#include "svc_rados.h"
8
9#include "rgw/rgw_zone.h"
10
11#define dout_subsys ceph_subsys_rgw
12
13static string notify_oid_prefix = "notify";
14
15class RGWWatcher : public librados::WatchCtx2 {
16 CephContext *cct;
17 RGWSI_Notify *svc;
18 int index;
19 RGWSI_RADOS::Obj obj;
20 uint64_t watch_handle;
21 int register_ret{0};
22 librados::AioCompletion *register_completion{nullptr};
23
24 class C_ReinitWatch : public Context {
25 RGWWatcher *watcher;
26 public:
27 explicit C_ReinitWatch(RGWWatcher *_watcher) : watcher(_watcher) {}
28 void finish(int r) override {
29 watcher->reinit();
30 }
31 };
32public:
33 RGWWatcher(CephContext *_cct, RGWSI_Notify *s, int i, RGWSI_RADOS::Obj& o) : cct(_cct), svc(s), index(i), obj(o), watch_handle(0) {}
34 void handle_notify(uint64_t notify_id,
35 uint64_t cookie,
36 uint64_t notifier_id,
37 bufferlist& bl) override {
38 ldout(cct, 10) << "RGWWatcher::handle_notify() "
39 << " notify_id " << notify_id
40 << " cookie " << cookie
41 << " notifier " << notifier_id
42 << " bl.length()=" << bl.length() << dendl;
43
44 if (unlikely(svc->inject_notify_timeout_probability == 1) ||
45 (svc->inject_notify_timeout_probability > 0 &&
46 (svc->inject_notify_timeout_probability >
47 ceph::util::generate_random_number(0.0, 1.0)))) {
48 ldout(cct, 0)
49 << "RGWWatcher::handle_notify() dropping notification! "
50 << "If this isn't what you want, set "
51 << "rgw_inject_notify_timeout_probability to zero!" << dendl;
52 return;
53 }
54
55 svc->watch_cb(notify_id, cookie, notifier_id, bl);
56
57 bufferlist reply_bl; // empty reply payload
58 obj.notify_ack(notify_id, cookie, reply_bl);
59 }
60 void handle_error(uint64_t cookie, int err) override {
61 lderr(cct) << "RGWWatcher::handle_error cookie " << cookie
62 << " err " << cpp_strerror(err) << dendl;
63 svc->remove_watcher(index);
64 svc->schedule_context(new C_ReinitWatch(this));
65 }
66
67 void reinit() {
68 int ret = unregister_watch();
69 if (ret < 0) {
70 ldout(cct, 0) << "ERROR: unregister_watch() returned ret=" << ret << dendl;
71 return;
72 }
73 ret = register_watch();
74 if (ret < 0) {
75 ldout(cct, 0) << "ERROR: register_watch() returned ret=" << ret << dendl;
76 return;
77 }
78 }
79
80 int unregister_watch() {
81 int r = svc->unwatch(obj, watch_handle);
82 if (r < 0) {
83 return r;
84 }
85 svc->remove_watcher(index);
86 return 0;
87 }
88
89 int register_watch_async() {
90 if (register_completion) {
91 register_completion->release();
92 register_completion = nullptr;
93 }
94 register_completion = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr);
95 register_ret = obj.aio_watch(register_completion, &watch_handle, this);
96 if (register_ret < 0) {
97 register_completion->release();
98 return register_ret;
99 }
100 return 0;
101 }
102
103 int register_watch_finish() {
104 if (register_ret < 0) {
105 return register_ret;
106 }
107 if (!register_completion) {
108 return -EINVAL;
109 }
110 register_completion->wait_for_safe();
111 int r = register_completion->get_return_value();
112 register_completion->release();
113 register_completion = nullptr;
114 if (r < 0) {
115 return r;
116 }
117 svc->add_watcher(index);
118 return 0;
119 }
120
121 int register_watch() {
122 int r = obj.watch(&watch_handle, this);
123 if (r < 0) {
124 return r;
125 }
126 svc->add_watcher(index);
127 return 0;
128 }
129};
130
131
132class RGWSI_Notify_ShutdownCB : public RGWSI_Finisher::ShutdownCB
133{
134 RGWSI_Notify *svc;
135public:
136 RGWSI_Notify_ShutdownCB(RGWSI_Notify *_svc) : svc(_svc) {}
137 void call() override {
138 svc->shutdown();
139 }
140};
141
142string RGWSI_Notify::get_control_oid(int i)
143{
144 char buf[notify_oid_prefix.size() + 16];
145 snprintf(buf, sizeof(buf), "%s.%d", notify_oid_prefix.c_str(), i);
146
147 return string(buf);
148}
149
150RGWSI_RADOS::Obj RGWSI_Notify::pick_control_obj(const string& key)
151{
152 uint32_t r = ceph_str_hash_linux(key.c_str(), key.size());
153
154 int i = r % num_watchers;
155 return notify_objs[i];
156}
157
158int RGWSI_Notify::init_watch()
159{
160 num_watchers = cct->_conf->rgw_num_control_oids;
161
162 bool compat_oid = (num_watchers == 0);
163
164 if (num_watchers <= 0)
165 num_watchers = 1;
166
167 watchers = new RGWWatcher *[num_watchers];
168
169 int error = 0;
170
171 notify_objs.resize(num_watchers);
172
173 for (int i=0; i < num_watchers; i++) {
174 string notify_oid;
175
176 if (!compat_oid) {
177 notify_oid = get_control_oid(i);
178 } else {
179 notify_oid = notify_oid_prefix;
180 }
181
494da23a 182 notify_objs[i] = rados_svc->handle().obj({control_pool, notify_oid});
11fdf7f2
TL
183 auto& notify_obj = notify_objs[i];
184
185 int r = notify_obj.open();
186 if (r < 0) {
187 ldout(cct, 0) << "ERROR: notify_obj.open() returned r=" << r << dendl;
188 return r;
189 }
190
191 librados::ObjectWriteOperation op;
192 op.create(false);
193 r = notify_obj.operate(&op, null_yield);
194 if (r < 0 && r != -EEXIST) {
195 ldout(cct, 0) << "ERROR: notify_obj.operate() returned r=" << r << dendl;
196 return r;
197 }
198
199 RGWWatcher *watcher = new RGWWatcher(cct, this, i, notify_obj);
200 watchers[i] = watcher;
201
202 r = watcher->register_watch_async();
203 if (r < 0) {
204 ldout(cct, 0) << "WARNING: register_watch_aio() returned " << r << dendl;
205 error = r;
206 continue;
207 }
208 }
209
210 for (int i = 0; i < num_watchers; ++i) {
211 int r = watchers[i]->register_watch_finish();
212 if (r < 0) {
213 ldout(cct, 0) << "WARNING: async watch returned " << r << dendl;
214 error = r;
215 }
216 }
217
218 if (error < 0) {
219 return error;
220 }
221
222 return 0;
223}
224
225void RGWSI_Notify::finalize_watch()
226{
227 for (int i = 0; i < num_watchers; i++) {
228 RGWWatcher *watcher = watchers[i];
229 watcher->unregister_watch();
230 delete watcher;
231 }
232
233 delete[] watchers;
234}
235
236int RGWSI_Notify::do_start()
237{
238 int r = zone_svc->start();
239 if (r < 0) {
240 return r;
241 }
242
243 assert(zone_svc->is_started()); /* otherwise there's an ordering problem */
244
245 r = rados_svc->start();
246 if (r < 0) {
247 return r;
248 }
249 r = finisher_svc->start();
250 if (r < 0) {
251 return r;
252 }
253
254 control_pool = zone_svc->get_zone_params().control_pool;
255
256 int ret = init_watch();
257 if (ret < 0) {
258 lderr(cct) << "ERROR: failed to initialize watch: " << cpp_strerror(-ret) << dendl;
259 return ret;
260 }
261
262 shutdown_cb = new RGWSI_Notify_ShutdownCB(this);
263 int handle;
264 finisher_svc->register_caller(shutdown_cb, &handle);
265 finisher_handle = handle;
266
267 return 0;
268}
269
270void RGWSI_Notify::shutdown()
271{
272 if (finalized) {
273 return;
274 }
275
276 if (finisher_handle) {
277 finisher_svc->unregister_caller(*finisher_handle);
278 }
279 finalize_watch();
280
281 delete shutdown_cb;
282
283 finalized = true;
284}
285
286RGWSI_Notify::~RGWSI_Notify()
287{
288 shutdown();
289}
290
291int RGWSI_Notify::unwatch(RGWSI_RADOS::Obj& obj, uint64_t watch_handle)
292{
293 int r = obj.unwatch(watch_handle);
294 if (r < 0) {
295 ldout(cct, 0) << "ERROR: rados->unwatch2() returned r=" << r << dendl;
296 return r;
297 }
494da23a 298 r = rados_svc->handle().watch_flush();
11fdf7f2
TL
299 if (r < 0) {
300 ldout(cct, 0) << "ERROR: rados->watch_flush() returned r=" << r << dendl;
301 return r;
302 }
303 return 0;
304}
305
306void RGWSI_Notify::add_watcher(int i)
307{
308 ldout(cct, 20) << "add_watcher() i=" << i << dendl;
309 RWLock::WLocker l(watchers_lock);
310 watchers_set.insert(i);
311 if (watchers_set.size() == (size_t)num_watchers) {
312 ldout(cct, 2) << "all " << num_watchers << " watchers are set, enabling cache" << dendl;
313 _set_enabled(true);
314 }
315}
316
317void RGWSI_Notify::remove_watcher(int i)
318{
319 ldout(cct, 20) << "remove_watcher() i=" << i << dendl;
320 RWLock::WLocker l(watchers_lock);
321 size_t orig_size = watchers_set.size();
322 watchers_set.erase(i);
323 if (orig_size == (size_t)num_watchers &&
324 watchers_set.size() < orig_size) { /* actually removed */
325 ldout(cct, 2) << "removed watcher, disabling cache" << dendl;
326 _set_enabled(false);
327 }
328}
329
330int RGWSI_Notify::watch_cb(uint64_t notify_id,
331 uint64_t cookie,
332 uint64_t notifier_id,
333 bufferlist& bl)
334{
335 RWLock::RLocker l(watchers_lock);
336 if (cb) {
337 return cb->watch_cb(notify_id, cookie, notifier_id, bl);
338 }
339 return 0;
340}
341
342void RGWSI_Notify::set_enabled(bool status)
343{
344 RWLock::WLocker l(watchers_lock);
345 _set_enabled(status);
346}
347
348void RGWSI_Notify::_set_enabled(bool status)
349{
350 enabled = status;
351 if (cb) {
352 cb->set_enabled(status);
353 }
354}
355
356int RGWSI_Notify::distribute(const string& key, bufferlist& bl)
357{
358 RGWSI_RADOS::Obj notify_obj = pick_control_obj(key);
359
360 ldout(cct, 10) << "distributing notification oid=" << notify_obj.get_ref().obj
361 << " bl.length()=" << bl.length() << dendl;
362 return robust_notify(notify_obj, bl);
363}
364
365int RGWSI_Notify::robust_notify(RGWSI_RADOS::Obj& notify_obj, bufferlist& bl)
366{
367 // The reply of every machine that acks goes in here.
368 boost::container::flat_set<std::pair<uint64_t, uint64_t>> acks;
369 bufferlist rbl;
370
371 // First, try to send, without being fancy about it.
372 auto r = notify_obj.notify(bl, 0, &rbl);
373
374 // If that doesn't work, get serious.
375 if (r < 0) {
376 ldout(cct, 1) << "robust_notify: If at first you don't succeed: "
377 << cpp_strerror(-r) << dendl;
378
379
380 auto p = rbl.cbegin();
381 // Gather up the replies to the first attempt.
382 try {
383 uint32_t num_acks;
384 decode(num_acks, p);
385 // Doing this ourselves since we don't care about the payload;
386 for (auto i = 0u; i < num_acks; ++i) {
387 std::pair<uint64_t, uint64_t> id;
388 decode(id, p);
389 acks.insert(id);
390 ldout(cct, 20) << "robust_notify: acked by " << id << dendl;
391 uint32_t blen;
392 decode(blen, p);
393 p.advance(blen);
394 }
395 } catch (const buffer::error& e) {
396 ldout(cct, 0) << "robust_notify: notify response parse failed: "
397 << e.what() << dendl;
398 acks.clear(); // Throw away junk on failed parse.
399 }
400
401
402 // Every machine that fails to reply and hasn't acked a previous
403 // attempt goes in here.
404 boost::container::flat_set<std::pair<uint64_t, uint64_t>> timeouts;
405
406 auto tries = 1u;
407 while (r < 0 && tries < max_notify_retries) {
408 ++tries;
409 rbl.clear();
410 // Reset the timeouts, we're only concerned with new ones.
411 timeouts.clear();
412 r = notify_obj.notify(bl, 0, &rbl);
413 if (r < 0) {
414 ldout(cct, 1) << "robust_notify: retry " << tries << " failed: "
415 << cpp_strerror(-r) << dendl;
416 p = rbl.begin();
417 try {
418 uint32_t num_acks;
419 decode(num_acks, p);
420 // Not only do we not care about the payload, but we don't
421 // want to empty the container; we just want to augment it
422 // with any new members.
423 for (auto i = 0u; i < num_acks; ++i) {
424 std::pair<uint64_t, uint64_t> id;
425 decode(id, p);
426 auto ir = acks.insert(id);
427 if (ir.second) {
428 ldout(cct, 20) << "robust_notify: acked by " << id << dendl;
429 }
430 uint32_t blen;
431 decode(blen, p);
432 p.advance(blen);
433 }
434
435 uint32_t num_timeouts;
436 decode(num_timeouts, p);
437 for (auto i = 0u; i < num_timeouts; ++i) {
438 std::pair<uint64_t, uint64_t> id;
439 decode(id, p);
440 // Only track timeouts from hosts that haven't acked previously.
441 if (acks.find(id) != acks.cend()) {
442 ldout(cct, 20) << "robust_notify: " << id << " timed out."
443 << dendl;
444 timeouts.insert(id);
445 }
446 }
447 } catch (const buffer::error& e) {
448 ldout(cct, 0) << "robust_notify: notify response parse failed: "
449 << e.what() << dendl;
450 continue;
451 }
452 // If we got a good parse and timeouts is empty, that means
453 // everyone who timed out in one call received the update in a
454 // previous one.
455 if (timeouts.empty()) {
456 r = 0;
457 }
458 }
459 }
460 }
461 return r;
462}
463
464void RGWSI_Notify::register_watch_cb(CB *_cb)
465{
466 RWLock::WLocker l(watchers_lock);
467 cb = _cb;
468 _set_enabled(enabled);
469}
470
471void RGWSI_Notify::schedule_context(Context *c)
472{
473 finisher_svc->schedule_context(c);
474}