]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/services/svc_notify.cc
import ceph 15.2.14
[ceph.git] / ceph / src / rgw / services / svc_notify.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab ft=cpp
3
11fdf7f2 4#include "include/random.h"
9f95a23c 5#include "include/Context.h"
11fdf7f2
TL
6#include "common/errno.h"
7
ec96510d 8#include "rgw/rgw_cache.h"
11fdf7f2
TL
9#include "svc_notify.h"
10#include "svc_finisher.h"
11#include "svc_zone.h"
12#include "svc_rados.h"
13
14#include "rgw/rgw_zone.h"
15
16#define dout_subsys ceph_subsys_rgw
17
18static string notify_oid_prefix = "notify";
19
20class RGWWatcher : public librados::WatchCtx2 {
21 CephContext *cct;
22 RGWSI_Notify *svc;
23 int index;
24 RGWSI_RADOS::Obj obj;
25 uint64_t watch_handle;
26 int register_ret{0};
27 librados::AioCompletion *register_completion{nullptr};
28
29 class C_ReinitWatch : public Context {
30 RGWWatcher *watcher;
31 public:
32 explicit C_ReinitWatch(RGWWatcher *_watcher) : watcher(_watcher) {}
33 void finish(int r) override {
34 watcher->reinit();
35 }
36 };
ec96510d 37
11fdf7f2
TL
38public:
39 RGWWatcher(CephContext *_cct, RGWSI_Notify *s, int i, RGWSI_RADOS::Obj& o) : cct(_cct), svc(s), index(i), obj(o), watch_handle(0) {}
40 void handle_notify(uint64_t notify_id,
41 uint64_t cookie,
42 uint64_t notifier_id,
43 bufferlist& bl) override {
44 ldout(cct, 10) << "RGWWatcher::handle_notify() "
45 << " notify_id " << notify_id
46 << " cookie " << cookie
47 << " notifier " << notifier_id
48 << " bl.length()=" << bl.length() << dendl;
49
50 if (unlikely(svc->inject_notify_timeout_probability == 1) ||
51 (svc->inject_notify_timeout_probability > 0 &&
52 (svc->inject_notify_timeout_probability >
53 ceph::util::generate_random_number(0.0, 1.0)))) {
54 ldout(cct, 0)
55 << "RGWWatcher::handle_notify() dropping notification! "
56 << "If this isn't what you want, set "
57 << "rgw_inject_notify_timeout_probability to zero!" << dendl;
58 return;
59 }
60
61 svc->watch_cb(notify_id, cookie, notifier_id, bl);
62
63 bufferlist reply_bl; // empty reply payload
64 obj.notify_ack(notify_id, cookie, reply_bl);
65 }
66 void handle_error(uint64_t cookie, int err) override {
67 lderr(cct) << "RGWWatcher::handle_error cookie " << cookie
68 << " err " << cpp_strerror(err) << dendl;
69 svc->remove_watcher(index);
70 svc->schedule_context(new C_ReinitWatch(this));
71 }
72
73 void reinit() {
74 int ret = unregister_watch();
75 if (ret < 0) {
76 ldout(cct, 0) << "ERROR: unregister_watch() returned ret=" << ret << dendl;
77 return;
78 }
79 ret = register_watch();
80 if (ret < 0) {
81 ldout(cct, 0) << "ERROR: register_watch() returned ret=" << ret << dendl;
82 return;
83 }
84 }
85
86 int unregister_watch() {
87 int r = svc->unwatch(obj, watch_handle);
88 if (r < 0) {
89 return r;
90 }
91 svc->remove_watcher(index);
92 return 0;
93 }
94
95 int register_watch_async() {
96 if (register_completion) {
97 register_completion->release();
98 register_completion = nullptr;
99 }
9f95a23c 100 register_completion = librados::Rados::aio_create_completion(nullptr, nullptr);
11fdf7f2
TL
101 register_ret = obj.aio_watch(register_completion, &watch_handle, this);
102 if (register_ret < 0) {
103 register_completion->release();
104 return register_ret;
105 }
106 return 0;
107 }
108
109 int register_watch_finish() {
110 if (register_ret < 0) {
111 return register_ret;
112 }
113 if (!register_completion) {
114 return -EINVAL;
115 }
9f95a23c 116 register_completion->wait_for_complete();
11fdf7f2
TL
117 int r = register_completion->get_return_value();
118 register_completion->release();
119 register_completion = nullptr;
120 if (r < 0) {
121 return r;
122 }
123 svc->add_watcher(index);
124 return 0;
125 }
126
127 int register_watch() {
128 int r = obj.watch(&watch_handle, this);
129 if (r < 0) {
130 return r;
131 }
132 svc->add_watcher(index);
133 return 0;
134 }
135};
136
137
138class RGWSI_Notify_ShutdownCB : public RGWSI_Finisher::ShutdownCB
139{
140 RGWSI_Notify *svc;
141public:
142 RGWSI_Notify_ShutdownCB(RGWSI_Notify *_svc) : svc(_svc) {}
143 void call() override {
144 svc->shutdown();
145 }
146};
147
148string RGWSI_Notify::get_control_oid(int i)
149{
150 char buf[notify_oid_prefix.size() + 16];
151 snprintf(buf, sizeof(buf), "%s.%d", notify_oid_prefix.c_str(), i);
152
153 return string(buf);
154}
155
adb31ebb 156// do not call pick_obj_control before init_watch
11fdf7f2
TL
157RGWSI_RADOS::Obj RGWSI_Notify::pick_control_obj(const string& key)
158{
159 uint32_t r = ceph_str_hash_linux(key.c_str(), key.size());
160
161 int i = r % num_watchers;
162 return notify_objs[i];
163}
164
165int RGWSI_Notify::init_watch()
166{
167 num_watchers = cct->_conf->rgw_num_control_oids;
168
169 bool compat_oid = (num_watchers == 0);
170
171 if (num_watchers <= 0)
172 num_watchers = 1;
173
174 watchers = new RGWWatcher *[num_watchers];
175
176 int error = 0;
177
178 notify_objs.resize(num_watchers);
179
180 for (int i=0; i < num_watchers; i++) {
181 string notify_oid;
182
183 if (!compat_oid) {
184 notify_oid = get_control_oid(i);
185 } else {
186 notify_oid = notify_oid_prefix;
187 }
188
494da23a 189 notify_objs[i] = rados_svc->handle().obj({control_pool, notify_oid});
11fdf7f2
TL
190 auto& notify_obj = notify_objs[i];
191
192 int r = notify_obj.open();
193 if (r < 0) {
194 ldout(cct, 0) << "ERROR: notify_obj.open() returned r=" << r << dendl;
195 return r;
196 }
197
198 librados::ObjectWriteOperation op;
199 op.create(false);
200 r = notify_obj.operate(&op, null_yield);
201 if (r < 0 && r != -EEXIST) {
202 ldout(cct, 0) << "ERROR: notify_obj.operate() returned r=" << r << dendl;
203 return r;
204 }
205
206 RGWWatcher *watcher = new RGWWatcher(cct, this, i, notify_obj);
207 watchers[i] = watcher;
208
209 r = watcher->register_watch_async();
210 if (r < 0) {
211 ldout(cct, 0) << "WARNING: register_watch_aio() returned " << r << dendl;
212 error = r;
213 continue;
214 }
215 }
216
217 for (int i = 0; i < num_watchers; ++i) {
218 int r = watchers[i]->register_watch_finish();
219 if (r < 0) {
220 ldout(cct, 0) << "WARNING: async watch returned " << r << dendl;
221 error = r;
222 }
223 }
224
225 if (error < 0) {
226 return error;
227 }
228
229 return 0;
230}
231
232void RGWSI_Notify::finalize_watch()
233{
234 for (int i = 0; i < num_watchers; i++) {
235 RGWWatcher *watcher = watchers[i];
236 watcher->unregister_watch();
237 delete watcher;
238 }
239
240 delete[] watchers;
241}
242
243int RGWSI_Notify::do_start()
244{
245 int r = zone_svc->start();
246 if (r < 0) {
247 return r;
248 }
249
250 assert(zone_svc->is_started()); /* otherwise there's an ordering problem */
251
252 r = rados_svc->start();
253 if (r < 0) {
254 return r;
255 }
256 r = finisher_svc->start();
257 if (r < 0) {
258 return r;
259 }
260
261 control_pool = zone_svc->get_zone_params().control_pool;
262
263 int ret = init_watch();
264 if (ret < 0) {
265 lderr(cct) << "ERROR: failed to initialize watch: " << cpp_strerror(-ret) << dendl;
266 return ret;
267 }
268
269 shutdown_cb = new RGWSI_Notify_ShutdownCB(this);
270 int handle;
271 finisher_svc->register_caller(shutdown_cb, &handle);
272 finisher_handle = handle;
273
274 return 0;
275}
276
277void RGWSI_Notify::shutdown()
278{
279 if (finalized) {
280 return;
281 }
282
283 if (finisher_handle) {
284 finisher_svc->unregister_caller(*finisher_handle);
285 }
286 finalize_watch();
287
288 delete shutdown_cb;
289
290 finalized = true;
291}
292
293RGWSI_Notify::~RGWSI_Notify()
294{
295 shutdown();
296}
297
298int RGWSI_Notify::unwatch(RGWSI_RADOS::Obj& obj, uint64_t watch_handle)
299{
300 int r = obj.unwatch(watch_handle);
301 if (r < 0) {
302 ldout(cct, 0) << "ERROR: rados->unwatch2() returned r=" << r << dendl;
303 return r;
304 }
494da23a 305 r = rados_svc->handle().watch_flush();
11fdf7f2
TL
306 if (r < 0) {
307 ldout(cct, 0) << "ERROR: rados->watch_flush() returned r=" << r << dendl;
308 return r;
309 }
310 return 0;
311}
312
313void RGWSI_Notify::add_watcher(int i)
314{
315 ldout(cct, 20) << "add_watcher() i=" << i << dendl;
9f95a23c 316 std::unique_lock l{watchers_lock};
11fdf7f2
TL
317 watchers_set.insert(i);
318 if (watchers_set.size() == (size_t)num_watchers) {
319 ldout(cct, 2) << "all " << num_watchers << " watchers are set, enabling cache" << dendl;
320 _set_enabled(true);
321 }
322}
323
324void RGWSI_Notify::remove_watcher(int i)
325{
326 ldout(cct, 20) << "remove_watcher() i=" << i << dendl;
9f95a23c 327 std::unique_lock l{watchers_lock};
11fdf7f2
TL
328 size_t orig_size = watchers_set.size();
329 watchers_set.erase(i);
330 if (orig_size == (size_t)num_watchers &&
331 watchers_set.size() < orig_size) { /* actually removed */
332 ldout(cct, 2) << "removed watcher, disabling cache" << dendl;
333 _set_enabled(false);
334 }
335}
336
337int RGWSI_Notify::watch_cb(uint64_t notify_id,
338 uint64_t cookie,
339 uint64_t notifier_id,
340 bufferlist& bl)
341{
9f95a23c 342 std::shared_lock l{watchers_lock};
11fdf7f2
TL
343 if (cb) {
344 return cb->watch_cb(notify_id, cookie, notifier_id, bl);
345 }
346 return 0;
347}
348
349void RGWSI_Notify::set_enabled(bool status)
350{
9f95a23c 351 std::unique_lock l{watchers_lock};
11fdf7f2
TL
352 _set_enabled(status);
353}
354
355void RGWSI_Notify::_set_enabled(bool status)
356{
357 enabled = status;
358 if (cb) {
359 cb->set_enabled(status);
360 }
361}
362
ec96510d
FG
363int RGWSI_Notify::distribute(const string& key, const RGWCacheNotifyInfo& cni,
364 optional_yield y)
11fdf7f2 365{
adb31ebb
TL
366 /* The RGW uses the control pool to store the watch notify objects.
367 The precedence in RGWSI_Notify::do_start is to call to zone_svc->start and later to init_watch().
368 The first time, RGW starts in the cluster, the RGW will try to create zone and zonegroup system object.
369 In that case RGW will try to distribute the cache before it ran init_watch,
370 which will lead to division by 0 in pick_obj_control (num_watchers is 0).
371 */
372 if (num_watchers > 0) {
373 RGWSI_RADOS::Obj notify_obj = pick_control_obj(key);
374
375 ldout(cct, 10) << "distributing notification oid=" << notify_obj.get_ref().obj
ec96510d
FG
376 << " cni=" << cni << dendl;
377 return robust_notify(notify_obj, cni, y);
adb31ebb
TL
378 }
379 return 0;
11fdf7f2
TL
380}
381
ec96510d
FG
382int RGWSI_Notify::robust_notify(RGWSI_RADOS::Obj& notify_obj,
383 const RGWCacheNotifyInfo& cni,
9f95a23c 384 optional_yield y)
11fdf7f2 385{
ec96510d
FG
386 bufferlist bl;
387 encode(cni, bl);
11fdf7f2
TL
388
389 // First, try to send, without being fancy about it.
ec96510d 390 auto r = notify_obj.notify(bl, 0, nullptr, y);
11fdf7f2 391
11fdf7f2 392 if (r < 0) {
ec96510d
FG
393 BackTrace bt(1);
394 ldout(cct, 1) << __PRETTY_FUNCTION__ << ":" << __LINE__
395 << " Notify failed on object " << cni.obj << ": "
396 << cpp_strerror(-r) << dendl;
397 ldout(cct, 1) << __PRETTY_FUNCTION__ << ":" << __LINE__
398 << " Backtrace: " << ": "
399 << bt << dendl;
400 }
11fdf7f2 401
ec96510d
FG
402 // If we timed out, get serious.
403 if (r == -ETIMEDOUT) {
404 RGWCacheNotifyInfo info;
405 info.op = REMOVE_OBJ;
406 info.obj = cni.obj;
407 bufferlist retrybl;
408 encode(info, retrybl);
409
410 for (auto tries = 0u;
411 r == -ETIMEDOUT && tries < max_notify_retries;
412 ++tries) {
413 ldout(cct, 1) << __PRETTY_FUNCTION__ << ":" << __LINE__
414 << " Invalidating obj=" << info.obj << " tries="
415 << tries << dendl;
416 r = notify_obj.notify(bl, 0, nullptr, y);
11fdf7f2 417 if (r < 0) {
ec96510d
FG
418 ldout(cct, 1) << __PRETTY_FUNCTION__ << ":" << __LINE__
419 << " invalidation attempt " << tries << " failed: "
11fdf7f2 420 << cpp_strerror(-r) << dendl;
11fdf7f2
TL
421 }
422 }
423 }
424 return r;
425}
426
427void RGWSI_Notify::register_watch_cb(CB *_cb)
428{
9f95a23c 429 std::unique_lock l{watchers_lock};
11fdf7f2
TL
430 cb = _cb;
431 _set_enabled(enabled);
432}
433
434void RGWSI_Notify::schedule_context(Context *c)
435{
436 finisher_svc->schedule_context(c);
437}