]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/services/svc_notify.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / rgw / services / svc_notify.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "include/random.h"
5 #include "include/Context.h"
6 #include "common/errno.h"
7
8 #include "svc_notify.h"
9 #include "svc_finisher.h"
10 #include "svc_zone.h"
11 #include "svc_rados.h"
12
13 #include "rgw/rgw_zone.h"
14
15 #define dout_subsys ceph_subsys_rgw
16
17 static string notify_oid_prefix = "notify";
18
19 class RGWWatcher : public librados::WatchCtx2 {
20 CephContext *cct;
21 RGWSI_Notify *svc;
22 int index;
23 RGWSI_RADOS::Obj obj;
24 uint64_t watch_handle;
25 int register_ret{0};
26 librados::AioCompletion *register_completion{nullptr};
27
28 class C_ReinitWatch : public Context {
29 RGWWatcher *watcher;
30 public:
31 explicit C_ReinitWatch(RGWWatcher *_watcher) : watcher(_watcher) {}
32 void finish(int r) override {
33 watcher->reinit();
34 }
35 };
36 public:
37 RGWWatcher(CephContext *_cct, RGWSI_Notify *s, int i, RGWSI_RADOS::Obj& o) : cct(_cct), svc(s), index(i), obj(o), watch_handle(0) {}
38 void handle_notify(uint64_t notify_id,
39 uint64_t cookie,
40 uint64_t notifier_id,
41 bufferlist& bl) override {
42 ldout(cct, 10) << "RGWWatcher::handle_notify() "
43 << " notify_id " << notify_id
44 << " cookie " << cookie
45 << " notifier " << notifier_id
46 << " bl.length()=" << bl.length() << dendl;
47
48 if (unlikely(svc->inject_notify_timeout_probability == 1) ||
49 (svc->inject_notify_timeout_probability > 0 &&
50 (svc->inject_notify_timeout_probability >
51 ceph::util::generate_random_number(0.0, 1.0)))) {
52 ldout(cct, 0)
53 << "RGWWatcher::handle_notify() dropping notification! "
54 << "If this isn't what you want, set "
55 << "rgw_inject_notify_timeout_probability to zero!" << dendl;
56 return;
57 }
58
59 svc->watch_cb(notify_id, cookie, notifier_id, bl);
60
61 bufferlist reply_bl; // empty reply payload
62 obj.notify_ack(notify_id, cookie, reply_bl);
63 }
64 void handle_error(uint64_t cookie, int err) override {
65 lderr(cct) << "RGWWatcher::handle_error cookie " << cookie
66 << " err " << cpp_strerror(err) << dendl;
67 svc->remove_watcher(index);
68 svc->schedule_context(new C_ReinitWatch(this));
69 }
70
71 void reinit() {
72 int ret = unregister_watch();
73 if (ret < 0) {
74 ldout(cct, 0) << "ERROR: unregister_watch() returned ret=" << ret << dendl;
75 return;
76 }
77 ret = register_watch();
78 if (ret < 0) {
79 ldout(cct, 0) << "ERROR: register_watch() returned ret=" << ret << dendl;
80 return;
81 }
82 }
83
84 int unregister_watch() {
85 int r = svc->unwatch(obj, watch_handle);
86 if (r < 0) {
87 return r;
88 }
89 svc->remove_watcher(index);
90 return 0;
91 }
92
93 int register_watch_async() {
94 if (register_completion) {
95 register_completion->release();
96 register_completion = nullptr;
97 }
98 register_completion = librados::Rados::aio_create_completion(nullptr, nullptr);
99 register_ret = obj.aio_watch(register_completion, &watch_handle, this);
100 if (register_ret < 0) {
101 register_completion->release();
102 return register_ret;
103 }
104 return 0;
105 }
106
107 int register_watch_finish() {
108 if (register_ret < 0) {
109 return register_ret;
110 }
111 if (!register_completion) {
112 return -EINVAL;
113 }
114 register_completion->wait_for_complete();
115 int r = register_completion->get_return_value();
116 register_completion->release();
117 register_completion = nullptr;
118 if (r < 0) {
119 return r;
120 }
121 svc->add_watcher(index);
122 return 0;
123 }
124
125 int register_watch() {
126 int r = obj.watch(&watch_handle, this);
127 if (r < 0) {
128 return r;
129 }
130 svc->add_watcher(index);
131 return 0;
132 }
133 };
134
135
136 class RGWSI_Notify_ShutdownCB : public RGWSI_Finisher::ShutdownCB
137 {
138 RGWSI_Notify *svc;
139 public:
140 RGWSI_Notify_ShutdownCB(RGWSI_Notify *_svc) : svc(_svc) {}
141 void call() override {
142 svc->shutdown();
143 }
144 };
145
146 string RGWSI_Notify::get_control_oid(int i)
147 {
148 char buf[notify_oid_prefix.size() + 16];
149 snprintf(buf, sizeof(buf), "%s.%d", notify_oid_prefix.c_str(), i);
150
151 return string(buf);
152 }
153
154 RGWSI_RADOS::Obj RGWSI_Notify::pick_control_obj(const string& key)
155 {
156 uint32_t r = ceph_str_hash_linux(key.c_str(), key.size());
157
158 int i = r % num_watchers;
159 return notify_objs[i];
160 }
161
162 int RGWSI_Notify::init_watch()
163 {
164 num_watchers = cct->_conf->rgw_num_control_oids;
165
166 bool compat_oid = (num_watchers == 0);
167
168 if (num_watchers <= 0)
169 num_watchers = 1;
170
171 watchers = new RGWWatcher *[num_watchers];
172
173 int error = 0;
174
175 notify_objs.resize(num_watchers);
176
177 for (int i=0; i < num_watchers; i++) {
178 string notify_oid;
179
180 if (!compat_oid) {
181 notify_oid = get_control_oid(i);
182 } else {
183 notify_oid = notify_oid_prefix;
184 }
185
186 notify_objs[i] = rados_svc->handle().obj({control_pool, notify_oid});
187 auto& notify_obj = notify_objs[i];
188
189 int r = notify_obj.open();
190 if (r < 0) {
191 ldout(cct, 0) << "ERROR: notify_obj.open() returned r=" << r << dendl;
192 return r;
193 }
194
195 librados::ObjectWriteOperation op;
196 op.create(false);
197 r = notify_obj.operate(&op, null_yield);
198 if (r < 0 && r != -EEXIST) {
199 ldout(cct, 0) << "ERROR: notify_obj.operate() returned r=" << r << dendl;
200 return r;
201 }
202
203 RGWWatcher *watcher = new RGWWatcher(cct, this, i, notify_obj);
204 watchers[i] = watcher;
205
206 r = watcher->register_watch_async();
207 if (r < 0) {
208 ldout(cct, 0) << "WARNING: register_watch_aio() returned " << r << dendl;
209 error = r;
210 continue;
211 }
212 }
213
214 for (int i = 0; i < num_watchers; ++i) {
215 int r = watchers[i]->register_watch_finish();
216 if (r < 0) {
217 ldout(cct, 0) << "WARNING: async watch returned " << r << dendl;
218 error = r;
219 }
220 }
221
222 if (error < 0) {
223 return error;
224 }
225
226 return 0;
227 }
228
229 void RGWSI_Notify::finalize_watch()
230 {
231 for (int i = 0; i < num_watchers; i++) {
232 RGWWatcher *watcher = watchers[i];
233 watcher->unregister_watch();
234 delete watcher;
235 }
236
237 delete[] watchers;
238 }
239
240 int RGWSI_Notify::do_start()
241 {
242 int r = zone_svc->start();
243 if (r < 0) {
244 return r;
245 }
246
247 assert(zone_svc->is_started()); /* otherwise there's an ordering problem */
248
249 r = rados_svc->start();
250 if (r < 0) {
251 return r;
252 }
253 r = finisher_svc->start();
254 if (r < 0) {
255 return r;
256 }
257
258 control_pool = zone_svc->get_zone_params().control_pool;
259
260 int ret = init_watch();
261 if (ret < 0) {
262 lderr(cct) << "ERROR: failed to initialize watch: " << cpp_strerror(-ret) << dendl;
263 return ret;
264 }
265
266 shutdown_cb = new RGWSI_Notify_ShutdownCB(this);
267 int handle;
268 finisher_svc->register_caller(shutdown_cb, &handle);
269 finisher_handle = handle;
270
271 return 0;
272 }
273
274 void RGWSI_Notify::shutdown()
275 {
276 if (finalized) {
277 return;
278 }
279
280 if (finisher_handle) {
281 finisher_svc->unregister_caller(*finisher_handle);
282 }
283 finalize_watch();
284
285 delete shutdown_cb;
286
287 finalized = true;
288 }
289
290 RGWSI_Notify::~RGWSI_Notify()
291 {
292 shutdown();
293 }
294
295 int RGWSI_Notify::unwatch(RGWSI_RADOS::Obj& obj, uint64_t watch_handle)
296 {
297 int r = obj.unwatch(watch_handle);
298 if (r < 0) {
299 ldout(cct, 0) << "ERROR: rados->unwatch2() returned r=" << r << dendl;
300 return r;
301 }
302 r = rados_svc->handle().watch_flush();
303 if (r < 0) {
304 ldout(cct, 0) << "ERROR: rados->watch_flush() returned r=" << r << dendl;
305 return r;
306 }
307 return 0;
308 }
309
310 void RGWSI_Notify::add_watcher(int i)
311 {
312 ldout(cct, 20) << "add_watcher() i=" << i << dendl;
313 std::unique_lock l{watchers_lock};
314 watchers_set.insert(i);
315 if (watchers_set.size() == (size_t)num_watchers) {
316 ldout(cct, 2) << "all " << num_watchers << " watchers are set, enabling cache" << dendl;
317 _set_enabled(true);
318 }
319 }
320
321 void RGWSI_Notify::remove_watcher(int i)
322 {
323 ldout(cct, 20) << "remove_watcher() i=" << i << dendl;
324 std::unique_lock l{watchers_lock};
325 size_t orig_size = watchers_set.size();
326 watchers_set.erase(i);
327 if (orig_size == (size_t)num_watchers &&
328 watchers_set.size() < orig_size) { /* actually removed */
329 ldout(cct, 2) << "removed watcher, disabling cache" << dendl;
330 _set_enabled(false);
331 }
332 }
333
334 int RGWSI_Notify::watch_cb(uint64_t notify_id,
335 uint64_t cookie,
336 uint64_t notifier_id,
337 bufferlist& bl)
338 {
339 std::shared_lock l{watchers_lock};
340 if (cb) {
341 return cb->watch_cb(notify_id, cookie, notifier_id, bl);
342 }
343 return 0;
344 }
345
346 void RGWSI_Notify::set_enabled(bool status)
347 {
348 std::unique_lock l{watchers_lock};
349 _set_enabled(status);
350 }
351
352 void RGWSI_Notify::_set_enabled(bool status)
353 {
354 enabled = status;
355 if (cb) {
356 cb->set_enabled(status);
357 }
358 }
359
360 int RGWSI_Notify::distribute(const string& key, bufferlist& bl,
361 optional_yield y)
362 {
363 RGWSI_RADOS::Obj notify_obj = pick_control_obj(key);
364
365 ldout(cct, 10) << "distributing notification oid=" << notify_obj.get_ref().obj
366 << " bl.length()=" << bl.length() << dendl;
367 return robust_notify(notify_obj, bl, y);
368 }
369
370 int RGWSI_Notify::robust_notify(RGWSI_RADOS::Obj& notify_obj, bufferlist& bl,
371 optional_yield y)
372 {
373 // The reply of every machine that acks goes in here.
374 boost::container::flat_set<std::pair<uint64_t, uint64_t>> acks;
375 bufferlist rbl;
376
377 // First, try to send, without being fancy about it.
378 auto r = notify_obj.notify(bl, 0, &rbl, y);
379
380 // If that doesn't work, get serious.
381 if (r < 0) {
382 ldout(cct, 1) << "robust_notify: If at first you don't succeed: "
383 << cpp_strerror(-r) << dendl;
384
385
386 auto p = rbl.cbegin();
387 // Gather up the replies to the first attempt.
388 try {
389 uint32_t num_acks;
390 decode(num_acks, p);
391 // Doing this ourselves since we don't care about the payload;
392 for (auto i = 0u; i < num_acks; ++i) {
393 std::pair<uint64_t, uint64_t> id;
394 decode(id, p);
395 acks.insert(id);
396 ldout(cct, 20) << "robust_notify: acked by " << id << dendl;
397 uint32_t blen;
398 decode(blen, p);
399 p += blen;
400 }
401 } catch (const buffer::error& e) {
402 ldout(cct, 0) << "robust_notify: notify response parse failed: "
403 << e.what() << dendl;
404 acks.clear(); // Throw away junk on failed parse.
405 }
406
407
408 // Every machine that fails to reply and hasn't acked a previous
409 // attempt goes in here.
410 boost::container::flat_set<std::pair<uint64_t, uint64_t>> timeouts;
411
412 auto tries = 1u;
413 while (r < 0 && tries < max_notify_retries) {
414 ++tries;
415 rbl.clear();
416 // Reset the timeouts, we're only concerned with new ones.
417 timeouts.clear();
418 r = notify_obj.notify(bl, 0, &rbl, y);
419 if (r < 0) {
420 ldout(cct, 1) << "robust_notify: retry " << tries << " failed: "
421 << cpp_strerror(-r) << dendl;
422 p = rbl.begin();
423 try {
424 uint32_t num_acks;
425 decode(num_acks, p);
426 // Not only do we not care about the payload, but we don't
427 // want to empty the container; we just want to augment it
428 // with any new members.
429 for (auto i = 0u; i < num_acks; ++i) {
430 std::pair<uint64_t, uint64_t> id;
431 decode(id, p);
432 auto ir = acks.insert(id);
433 if (ir.second) {
434 ldout(cct, 20) << "robust_notify: acked by " << id << dendl;
435 }
436 uint32_t blen;
437 decode(blen, p);
438 p += blen;
439 }
440
441 uint32_t num_timeouts;
442 decode(num_timeouts, p);
443 for (auto i = 0u; i < num_timeouts; ++i) {
444 std::pair<uint64_t, uint64_t> id;
445 decode(id, p);
446 // Only track timeouts from hosts that haven't acked previously.
447 if (acks.find(id) != acks.cend()) {
448 ldout(cct, 20) << "robust_notify: " << id << " timed out."
449 << dendl;
450 timeouts.insert(id);
451 }
452 }
453 } catch (const buffer::error& e) {
454 ldout(cct, 0) << "robust_notify: notify response parse failed: "
455 << e.what() << dendl;
456 continue;
457 }
458 // If we got a good parse and timeouts is empty, that means
459 // everyone who timed out in one call received the update in a
460 // previous one.
461 if (timeouts.empty()) {
462 r = 0;
463 }
464 }
465 }
466 }
467 return r;
468 }
469
470 void RGWSI_Notify::register_watch_cb(CB *_cb)
471 {
472 std::unique_lock l{watchers_lock};
473 cb = _cb;
474 _set_enabled(enabled);
475 }
476
477 void RGWSI_Notify::schedule_context(Context *c)
478 {
479 finisher_svc->schedule_context(c);
480 }