]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/services/svc_notify.cc
import ceph 16.2.6
[ceph.git] / ceph / src / rgw / services / svc_notify.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab ft=cpp
3
11fdf7f2 4#include "include/random.h"
9f95a23c 5#include "include/Context.h"
11fdf7f2
TL
6#include "common/errno.h"
7
522d829b 8#include "rgw/rgw_cache.h"
11fdf7f2
TL
9#include "svc_notify.h"
10#include "svc_finisher.h"
11#include "svc_zone.h"
12#include "svc_rados.h"
13
14#include "rgw/rgw_zone.h"
15
16#define dout_subsys ceph_subsys_rgw
17
18static string notify_oid_prefix = "notify";
19
b3b6e05e 20class RGWWatcher : public DoutPrefixProvider , public librados::WatchCtx2 {
11fdf7f2
TL
21 CephContext *cct;
22 RGWSI_Notify *svc;
23 int index;
24 RGWSI_RADOS::Obj obj;
25 uint64_t watch_handle;
26 int register_ret{0};
27 librados::AioCompletion *register_completion{nullptr};
28
29 class C_ReinitWatch : public Context {
30 RGWWatcher *watcher;
31 public:
32 explicit C_ReinitWatch(RGWWatcher *_watcher) : watcher(_watcher) {}
33 void finish(int r) override {
34 watcher->reinit();
35 }
36 };
b3b6e05e 37
522d829b
TL
38 CephContext *get_cct() const override { return cct; }
39 unsigned get_subsys() const override { return dout_subsys; }
40 std::ostream& gen_prefix(std::ostream& out) const override {
41 return out << "rgw watcher librados: ";
42 }
b3b6e05e 43
11fdf7f2
TL
44public:
45 RGWWatcher(CephContext *_cct, RGWSI_Notify *s, int i, RGWSI_RADOS::Obj& o) : cct(_cct), svc(s), index(i), obj(o), watch_handle(0) {}
46 void handle_notify(uint64_t notify_id,
47 uint64_t cookie,
48 uint64_t notifier_id,
49 bufferlist& bl) override {
b3b6e05e 50 ldpp_dout(this, 10) << "RGWWatcher::handle_notify() "
11fdf7f2
TL
51 << " notify_id " << notify_id
52 << " cookie " << cookie
53 << " notifier " << notifier_id
54 << " bl.length()=" << bl.length() << dendl;
55
56 if (unlikely(svc->inject_notify_timeout_probability == 1) ||
57 (svc->inject_notify_timeout_probability > 0 &&
58 (svc->inject_notify_timeout_probability >
59 ceph::util::generate_random_number(0.0, 1.0)))) {
b3b6e05e 60 ldpp_dout(this, 0)
11fdf7f2
TL
61 << "RGWWatcher::handle_notify() dropping notification! "
62 << "If this isn't what you want, set "
63 << "rgw_inject_notify_timeout_probability to zero!" << dendl;
64 return;
65 }
66
b3b6e05e 67 svc->watch_cb(this, notify_id, cookie, notifier_id, bl);
11fdf7f2
TL
68
69 bufferlist reply_bl; // empty reply payload
70 obj.notify_ack(notify_id, cookie, reply_bl);
71 }
72 void handle_error(uint64_t cookie, int err) override {
73 lderr(cct) << "RGWWatcher::handle_error cookie " << cookie
74 << " err " << cpp_strerror(err) << dendl;
75 svc->remove_watcher(index);
76 svc->schedule_context(new C_ReinitWatch(this));
77 }
78
79 void reinit() {
80 int ret = unregister_watch();
81 if (ret < 0) {
82 ldout(cct, 0) << "ERROR: unregister_watch() returned ret=" << ret << dendl;
83 return;
84 }
85 ret = register_watch();
86 if (ret < 0) {
87 ldout(cct, 0) << "ERROR: register_watch() returned ret=" << ret << dendl;
88 return;
89 }
90 }
91
92 int unregister_watch() {
93 int r = svc->unwatch(obj, watch_handle);
94 if (r < 0) {
95 return r;
96 }
97 svc->remove_watcher(index);
98 return 0;
99 }
100
101 int register_watch_async() {
102 if (register_completion) {
103 register_completion->release();
104 register_completion = nullptr;
105 }
9f95a23c 106 register_completion = librados::Rados::aio_create_completion(nullptr, nullptr);
11fdf7f2
TL
107 register_ret = obj.aio_watch(register_completion, &watch_handle, this);
108 if (register_ret < 0) {
109 register_completion->release();
110 return register_ret;
111 }
112 return 0;
113 }
114
115 int register_watch_finish() {
116 if (register_ret < 0) {
117 return register_ret;
118 }
119 if (!register_completion) {
120 return -EINVAL;
121 }
9f95a23c 122 register_completion->wait_for_complete();
11fdf7f2
TL
123 int r = register_completion->get_return_value();
124 register_completion->release();
125 register_completion = nullptr;
126 if (r < 0) {
127 return r;
128 }
129 svc->add_watcher(index);
130 return 0;
131 }
132
133 int register_watch() {
134 int r = obj.watch(&watch_handle, this);
135 if (r < 0) {
136 return r;
137 }
138 svc->add_watcher(index);
139 return 0;
140 }
141};
142
143
144class RGWSI_Notify_ShutdownCB : public RGWSI_Finisher::ShutdownCB
145{
146 RGWSI_Notify *svc;
147public:
148 RGWSI_Notify_ShutdownCB(RGWSI_Notify *_svc) : svc(_svc) {}
149 void call() override {
150 svc->shutdown();
151 }
152};
153
154string RGWSI_Notify::get_control_oid(int i)
155{
156 char buf[notify_oid_prefix.size() + 16];
157 snprintf(buf, sizeof(buf), "%s.%d", notify_oid_prefix.c_str(), i);
158
159 return string(buf);
160}
161
adb31ebb 162// do not call pick_obj_control before init_watch
11fdf7f2
TL
163RGWSI_RADOS::Obj RGWSI_Notify::pick_control_obj(const string& key)
164{
165 uint32_t r = ceph_str_hash_linux(key.c_str(), key.size());
166
167 int i = r % num_watchers;
168 return notify_objs[i];
169}
170
b3b6e05e 171int RGWSI_Notify::init_watch(const DoutPrefixProvider *dpp, optional_yield y)
11fdf7f2
TL
172{
173 num_watchers = cct->_conf->rgw_num_control_oids;
174
175 bool compat_oid = (num_watchers == 0);
176
177 if (num_watchers <= 0)
178 num_watchers = 1;
179
180 watchers = new RGWWatcher *[num_watchers];
181
182 int error = 0;
183
184 notify_objs.resize(num_watchers);
185
186 for (int i=0; i < num_watchers; i++) {
187 string notify_oid;
188
189 if (!compat_oid) {
190 notify_oid = get_control_oid(i);
191 } else {
192 notify_oid = notify_oid_prefix;
193 }
194
494da23a 195 notify_objs[i] = rados_svc->handle().obj({control_pool, notify_oid});
11fdf7f2
TL
196 auto& notify_obj = notify_objs[i];
197
b3b6e05e 198 int r = notify_obj.open(dpp);
11fdf7f2 199 if (r < 0) {
b3b6e05e 200 ldpp_dout(dpp, 0) << "ERROR: notify_obj.open() returned r=" << r << dendl;
11fdf7f2
TL
201 return r;
202 }
203
204 librados::ObjectWriteOperation op;
205 op.create(false);
b3b6e05e 206 r = notify_obj.operate(dpp, &op, y);
11fdf7f2 207 if (r < 0 && r != -EEXIST) {
b3b6e05e 208 ldpp_dout(dpp, 0) << "ERROR: notify_obj.operate() returned r=" << r << dendl;
11fdf7f2
TL
209 return r;
210 }
211
212 RGWWatcher *watcher = new RGWWatcher(cct, this, i, notify_obj);
213 watchers[i] = watcher;
214
215 r = watcher->register_watch_async();
216 if (r < 0) {
b3b6e05e 217 ldpp_dout(dpp, 0) << "WARNING: register_watch_aio() returned " << r << dendl;
11fdf7f2
TL
218 error = r;
219 continue;
220 }
221 }
222
223 for (int i = 0; i < num_watchers; ++i) {
224 int r = watchers[i]->register_watch_finish();
225 if (r < 0) {
b3b6e05e 226 ldpp_dout(dpp, 0) << "WARNING: async watch returned " << r << dendl;
11fdf7f2
TL
227 error = r;
228 }
229 }
230
231 if (error < 0) {
232 return error;
233 }
234
235 return 0;
236}
237
238void RGWSI_Notify::finalize_watch()
239{
240 for (int i = 0; i < num_watchers; i++) {
241 RGWWatcher *watcher = watchers[i];
242 watcher->unregister_watch();
243 delete watcher;
244 }
245
246 delete[] watchers;
247}
248
b3b6e05e 249int RGWSI_Notify::do_start(optional_yield y, const DoutPrefixProvider *dpp)
11fdf7f2 250{
b3b6e05e 251 int r = zone_svc->start(y, dpp);
11fdf7f2
TL
252 if (r < 0) {
253 return r;
254 }
255
256 assert(zone_svc->is_started()); /* otherwise there's an ordering problem */
257
b3b6e05e 258 r = rados_svc->start(y, dpp);
11fdf7f2
TL
259 if (r < 0) {
260 return r;
261 }
b3b6e05e 262 r = finisher_svc->start(y, dpp);
11fdf7f2
TL
263 if (r < 0) {
264 return r;
265 }
266
267 control_pool = zone_svc->get_zone_params().control_pool;
268
b3b6e05e 269 int ret = init_watch(dpp, y);
11fdf7f2
TL
270 if (ret < 0) {
271 lderr(cct) << "ERROR: failed to initialize watch: " << cpp_strerror(-ret) << dendl;
272 return ret;
273 }
274
275 shutdown_cb = new RGWSI_Notify_ShutdownCB(this);
276 int handle;
277 finisher_svc->register_caller(shutdown_cb, &handle);
278 finisher_handle = handle;
279
280 return 0;
281}
282
283void RGWSI_Notify::shutdown()
284{
285 if (finalized) {
286 return;
287 }
288
289 if (finisher_handle) {
290 finisher_svc->unregister_caller(*finisher_handle);
291 }
292 finalize_watch();
293
294 delete shutdown_cb;
295
296 finalized = true;
297}
298
299RGWSI_Notify::~RGWSI_Notify()
300{
301 shutdown();
302}
303
304int RGWSI_Notify::unwatch(RGWSI_RADOS::Obj& obj, uint64_t watch_handle)
305{
306 int r = obj.unwatch(watch_handle);
307 if (r < 0) {
308 ldout(cct, 0) << "ERROR: rados->unwatch2() returned r=" << r << dendl;
309 return r;
310 }
494da23a 311 r = rados_svc->handle().watch_flush();
11fdf7f2
TL
312 if (r < 0) {
313 ldout(cct, 0) << "ERROR: rados->watch_flush() returned r=" << r << dendl;
314 return r;
315 }
316 return 0;
317}
318
319void RGWSI_Notify::add_watcher(int i)
320{
321 ldout(cct, 20) << "add_watcher() i=" << i << dendl;
9f95a23c 322 std::unique_lock l{watchers_lock};
11fdf7f2
TL
323 watchers_set.insert(i);
324 if (watchers_set.size() == (size_t)num_watchers) {
325 ldout(cct, 2) << "all " << num_watchers << " watchers are set, enabling cache" << dendl;
326 _set_enabled(true);
327 }
328}
329
330void RGWSI_Notify::remove_watcher(int i)
331{
332 ldout(cct, 20) << "remove_watcher() i=" << i << dendl;
9f95a23c 333 std::unique_lock l{watchers_lock};
11fdf7f2
TL
334 size_t orig_size = watchers_set.size();
335 watchers_set.erase(i);
336 if (orig_size == (size_t)num_watchers &&
337 watchers_set.size() < orig_size) { /* actually removed */
338 ldout(cct, 2) << "removed watcher, disabling cache" << dendl;
339 _set_enabled(false);
340 }
341}
342
b3b6e05e
TL
343int RGWSI_Notify::watch_cb(const DoutPrefixProvider *dpp,
344 uint64_t notify_id,
11fdf7f2
TL
345 uint64_t cookie,
346 uint64_t notifier_id,
347 bufferlist& bl)
348{
9f95a23c 349 std::shared_lock l{watchers_lock};
11fdf7f2 350 if (cb) {
b3b6e05e 351 return cb->watch_cb(dpp, notify_id, cookie, notifier_id, bl);
11fdf7f2
TL
352 }
353 return 0;
354}
355
356void RGWSI_Notify::set_enabled(bool status)
357{
9f95a23c 358 std::unique_lock l{watchers_lock};
11fdf7f2
TL
359 _set_enabled(status);
360}
361
362void RGWSI_Notify::_set_enabled(bool status)
363{
364 enabled = status;
365 if (cb) {
366 cb->set_enabled(status);
367 }
368}
369
522d829b
TL
370int RGWSI_Notify::distribute(const DoutPrefixProvider *dpp, const string& key,
371 const RGWCacheNotifyInfo& cni,
9f95a23c 372 optional_yield y)
11fdf7f2 373{
adb31ebb
TL
374 /* The RGW uses the control pool to store the watch notify objects.
375 The precedence in RGWSI_Notify::do_start is to call to zone_svc->start and later to init_watch().
376 The first time, RGW starts in the cluster, the RGW will try to create zone and zonegroup system object.
377 In that case RGW will try to distribute the cache before it ran init_watch,
378 which will lead to division by 0 in pick_obj_control (num_watchers is 0).
379 */
380 if (num_watchers > 0) {
381 RGWSI_RADOS::Obj notify_obj = pick_control_obj(key);
382
b3b6e05e 383 ldpp_dout(dpp, 10) << "distributing notification oid=" << notify_obj.get_ref().obj
522d829b
TL
384 << " cni=" << cni << dendl;
385 return robust_notify(dpp, notify_obj, cni, y);
adb31ebb
TL
386 }
387 return 0;
11fdf7f2
TL
388}
389
522d829b
TL
390int RGWSI_Notify::robust_notify(const DoutPrefixProvider *dpp,
391 RGWSI_RADOS::Obj& notify_obj,
392 const RGWCacheNotifyInfo& cni,
9f95a23c 393 optional_yield y)
11fdf7f2 394{
522d829b
TL
395 bufferlist bl;
396 encode(cni, bl);
11fdf7f2
TL
397
398 // First, try to send, without being fancy about it.
522d829b 399 auto r = notify_obj.notify(dpp, bl, 0, nullptr, y);
11fdf7f2 400
11fdf7f2 401 if (r < 0) {
522d829b
TL
402 ldpp_dout(dpp, 1) << __PRETTY_FUNCTION__ << ":" << __LINE__
403 << " Notify failed on object " << cni.obj << ": "
404 << cpp_strerror(-r) << dendl;
405 }
11fdf7f2 406
522d829b
TL
407 // If we timed out, get serious.
408 if (r == -ETIMEDOUT) {
409 RGWCacheNotifyInfo info;
410 info.op = REMOVE_OBJ;
411 info.obj = cni.obj;
412 bufferlist retrybl;
413 encode(info, retrybl);
414
415 for (auto tries = 0u;
416 r == -ETIMEDOUT && tries < max_notify_retries;
417 ++tries) {
418 ldpp_dout(dpp, 1) << __PRETTY_FUNCTION__ << ":" << __LINE__
419 << " Invalidating obj=" << info.obj << " tries="
420 << tries << dendl;
421 r = notify_obj.notify(dpp, bl, 0, nullptr, y);
11fdf7f2 422 if (r < 0) {
522d829b
TL
423 ldpp_dout(dpp, 1) << __PRETTY_FUNCTION__ << ":" << __LINE__
424 << " invalidation attempt " << tries << " failed: "
425 << cpp_strerror(-r) << dendl;
11fdf7f2
TL
426 }
427 }
428 }
429 return r;
430}
431
432void RGWSI_Notify::register_watch_cb(CB *_cb)
433{
9f95a23c 434 std::unique_lock l{watchers_lock};
11fdf7f2
TL
435 cb = _cb;
436 _set_enabled(enabled);
437}
438
439void RGWSI_Notify::schedule_context(Context *c)
440{
441 finisher_svc->schedule_context(c);
442}