]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/services/svc_notify.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / services / svc_notify.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab ft=cpp
3
11fdf7f2 4#include "include/random.h"
9f95a23c 5#include "include/Context.h"
11fdf7f2
TL
6#include "common/errno.h"
7
522d829b 8#include "rgw/rgw_cache.h"
11fdf7f2
TL
9#include "svc_notify.h"
10#include "svc_finisher.h"
11#include "svc_zone.h"
12#include "svc_rados.h"
13
14#include "rgw/rgw_zone.h"
15
16#define dout_subsys ceph_subsys_rgw
17
20effc67
TL
18using namespace std;
19
11fdf7f2
TL
20static string notify_oid_prefix = "notify";
21
20effc67
TL
22RGWSI_Notify::~RGWSI_Notify()
23{
24 shutdown();
25}
26
27
b3b6e05e 28class RGWWatcher : public DoutPrefixProvider , public librados::WatchCtx2 {
11fdf7f2
TL
29 CephContext *cct;
30 RGWSI_Notify *svc;
31 int index;
32 RGWSI_RADOS::Obj obj;
33 uint64_t watch_handle;
34 int register_ret{0};
35 librados::AioCompletion *register_completion{nullptr};
36
37 class C_ReinitWatch : public Context {
38 RGWWatcher *watcher;
39 public:
40 explicit C_ReinitWatch(RGWWatcher *_watcher) : watcher(_watcher) {}
41 void finish(int r) override {
42 watcher->reinit();
43 }
44 };
b3b6e05e 45
522d829b
TL
46 CephContext *get_cct() const override { return cct; }
47 unsigned get_subsys() const override { return dout_subsys; }
48 std::ostream& gen_prefix(std::ostream& out) const override {
49 return out << "rgw watcher librados: ";
50 }
b3b6e05e 51
11fdf7f2
TL
52public:
53 RGWWatcher(CephContext *_cct, RGWSI_Notify *s, int i, RGWSI_RADOS::Obj& o) : cct(_cct), svc(s), index(i), obj(o), watch_handle(0) {}
54 void handle_notify(uint64_t notify_id,
55 uint64_t cookie,
56 uint64_t notifier_id,
57 bufferlist& bl) override {
b3b6e05e 58 ldpp_dout(this, 10) << "RGWWatcher::handle_notify() "
11fdf7f2
TL
59 << " notify_id " << notify_id
60 << " cookie " << cookie
61 << " notifier " << notifier_id
62 << " bl.length()=" << bl.length() << dendl;
63
64 if (unlikely(svc->inject_notify_timeout_probability == 1) ||
65 (svc->inject_notify_timeout_probability > 0 &&
66 (svc->inject_notify_timeout_probability >
67 ceph::util::generate_random_number(0.0, 1.0)))) {
b3b6e05e 68 ldpp_dout(this, 0)
11fdf7f2
TL
69 << "RGWWatcher::handle_notify() dropping notification! "
70 << "If this isn't what you want, set "
71 << "rgw_inject_notify_timeout_probability to zero!" << dendl;
72 return;
73 }
74
b3b6e05e 75 svc->watch_cb(this, notify_id, cookie, notifier_id, bl);
11fdf7f2
TL
76
77 bufferlist reply_bl; // empty reply payload
78 obj.notify_ack(notify_id, cookie, reply_bl);
79 }
80 void handle_error(uint64_t cookie, int err) override {
20effc67 81 ldpp_dout(this, -1) << "RGWWatcher::handle_error cookie " << cookie
11fdf7f2
TL
82 << " err " << cpp_strerror(err) << dendl;
83 svc->remove_watcher(index);
84 svc->schedule_context(new C_ReinitWatch(this));
85 }
86
87 void reinit() {
88 int ret = unregister_watch();
89 if (ret < 0) {
90 ldout(cct, 0) << "ERROR: unregister_watch() returned ret=" << ret << dendl;
91 return;
92 }
93 ret = register_watch();
94 if (ret < 0) {
95 ldout(cct, 0) << "ERROR: register_watch() returned ret=" << ret << dendl;
96 return;
97 }
98 }
99
100 int unregister_watch() {
101 int r = svc->unwatch(obj, watch_handle);
102 if (r < 0) {
103 return r;
104 }
105 svc->remove_watcher(index);
106 return 0;
107 }
108
109 int register_watch_async() {
110 if (register_completion) {
111 register_completion->release();
112 register_completion = nullptr;
113 }
9f95a23c 114 register_completion = librados::Rados::aio_create_completion(nullptr, nullptr);
11fdf7f2
TL
115 register_ret = obj.aio_watch(register_completion, &watch_handle, this);
116 if (register_ret < 0) {
117 register_completion->release();
118 return register_ret;
119 }
120 return 0;
121 }
122
123 int register_watch_finish() {
124 if (register_ret < 0) {
125 return register_ret;
126 }
127 if (!register_completion) {
128 return -EINVAL;
129 }
9f95a23c 130 register_completion->wait_for_complete();
11fdf7f2
TL
131 int r = register_completion->get_return_value();
132 register_completion->release();
133 register_completion = nullptr;
134 if (r < 0) {
135 return r;
136 }
137 svc->add_watcher(index);
138 return 0;
139 }
140
141 int register_watch() {
142 int r = obj.watch(&watch_handle, this);
143 if (r < 0) {
144 return r;
145 }
146 svc->add_watcher(index);
147 return 0;
148 }
149};
150
151
152class RGWSI_Notify_ShutdownCB : public RGWSI_Finisher::ShutdownCB
153{
154 RGWSI_Notify *svc;
155public:
156 RGWSI_Notify_ShutdownCB(RGWSI_Notify *_svc) : svc(_svc) {}
157 void call() override {
158 svc->shutdown();
159 }
160};
161
162string RGWSI_Notify::get_control_oid(int i)
163{
164 char buf[notify_oid_prefix.size() + 16];
165 snprintf(buf, sizeof(buf), "%s.%d", notify_oid_prefix.c_str(), i);
166
167 return string(buf);
168}
169
adb31ebb 170// do not call pick_obj_control before init_watch
11fdf7f2
TL
171RGWSI_RADOS::Obj RGWSI_Notify::pick_control_obj(const string& key)
172{
173 uint32_t r = ceph_str_hash_linux(key.c_str(), key.size());
174
175 int i = r % num_watchers;
176 return notify_objs[i];
177}
178
b3b6e05e 179int RGWSI_Notify::init_watch(const DoutPrefixProvider *dpp, optional_yield y)
11fdf7f2
TL
180{
181 num_watchers = cct->_conf->rgw_num_control_oids;
182
183 bool compat_oid = (num_watchers == 0);
184
185 if (num_watchers <= 0)
186 num_watchers = 1;
187
188 watchers = new RGWWatcher *[num_watchers];
189
190 int error = 0;
191
192 notify_objs.resize(num_watchers);
193
194 for (int i=0; i < num_watchers; i++) {
195 string notify_oid;
196
197 if (!compat_oid) {
198 notify_oid = get_control_oid(i);
199 } else {
200 notify_oid = notify_oid_prefix;
201 }
202
494da23a 203 notify_objs[i] = rados_svc->handle().obj({control_pool, notify_oid});
11fdf7f2
TL
204 auto& notify_obj = notify_objs[i];
205
b3b6e05e 206 int r = notify_obj.open(dpp);
11fdf7f2 207 if (r < 0) {
b3b6e05e 208 ldpp_dout(dpp, 0) << "ERROR: notify_obj.open() returned r=" << r << dendl;
11fdf7f2
TL
209 return r;
210 }
211
212 librados::ObjectWriteOperation op;
213 op.create(false);
b3b6e05e 214 r = notify_obj.operate(dpp, &op, y);
11fdf7f2 215 if (r < 0 && r != -EEXIST) {
b3b6e05e 216 ldpp_dout(dpp, 0) << "ERROR: notify_obj.operate() returned r=" << r << dendl;
11fdf7f2
TL
217 return r;
218 }
219
220 RGWWatcher *watcher = new RGWWatcher(cct, this, i, notify_obj);
221 watchers[i] = watcher;
222
223 r = watcher->register_watch_async();
224 if (r < 0) {
b3b6e05e 225 ldpp_dout(dpp, 0) << "WARNING: register_watch_aio() returned " << r << dendl;
11fdf7f2
TL
226 error = r;
227 continue;
228 }
229 }
230
231 for (int i = 0; i < num_watchers; ++i) {
232 int r = watchers[i]->register_watch_finish();
233 if (r < 0) {
b3b6e05e 234 ldpp_dout(dpp, 0) << "WARNING: async watch returned " << r << dendl;
11fdf7f2
TL
235 error = r;
236 }
237 }
238
239 if (error < 0) {
240 return error;
241 }
242
243 return 0;
244}
245
246void RGWSI_Notify::finalize_watch()
247{
248 for (int i = 0; i < num_watchers; i++) {
249 RGWWatcher *watcher = watchers[i];
250 watcher->unregister_watch();
251 delete watcher;
252 }
253
254 delete[] watchers;
255}
256
b3b6e05e 257int RGWSI_Notify::do_start(optional_yield y, const DoutPrefixProvider *dpp)
11fdf7f2 258{
b3b6e05e 259 int r = zone_svc->start(y, dpp);
11fdf7f2
TL
260 if (r < 0) {
261 return r;
262 }
263
264 assert(zone_svc->is_started()); /* otherwise there's an ordering problem */
265
b3b6e05e 266 r = rados_svc->start(y, dpp);
11fdf7f2
TL
267 if (r < 0) {
268 return r;
269 }
b3b6e05e 270 r = finisher_svc->start(y, dpp);
11fdf7f2
TL
271 if (r < 0) {
272 return r;
273 }
274
275 control_pool = zone_svc->get_zone_params().control_pool;
276
b3b6e05e 277 int ret = init_watch(dpp, y);
11fdf7f2 278 if (ret < 0) {
20effc67 279 ldpp_dout(dpp, -1) << "ERROR: failed to initialize watch: " << cpp_strerror(-ret) << dendl;
11fdf7f2
TL
280 return ret;
281 }
282
283 shutdown_cb = new RGWSI_Notify_ShutdownCB(this);
284 int handle;
285 finisher_svc->register_caller(shutdown_cb, &handle);
286 finisher_handle = handle;
287
288 return 0;
289}
290
291void RGWSI_Notify::shutdown()
292{
293 if (finalized) {
294 return;
295 }
296
297 if (finisher_handle) {
298 finisher_svc->unregister_caller(*finisher_handle);
299 }
300 finalize_watch();
301
302 delete shutdown_cb;
303
304 finalized = true;
305}
306
11fdf7f2
TL
307int RGWSI_Notify::unwatch(RGWSI_RADOS::Obj& obj, uint64_t watch_handle)
308{
309 int r = obj.unwatch(watch_handle);
310 if (r < 0) {
311 ldout(cct, 0) << "ERROR: rados->unwatch2() returned r=" << r << dendl;
312 return r;
313 }
494da23a 314 r = rados_svc->handle().watch_flush();
11fdf7f2
TL
315 if (r < 0) {
316 ldout(cct, 0) << "ERROR: rados->watch_flush() returned r=" << r << dendl;
317 return r;
318 }
319 return 0;
320}
321
322void RGWSI_Notify::add_watcher(int i)
323{
324 ldout(cct, 20) << "add_watcher() i=" << i << dendl;
9f95a23c 325 std::unique_lock l{watchers_lock};
11fdf7f2
TL
326 watchers_set.insert(i);
327 if (watchers_set.size() == (size_t)num_watchers) {
328 ldout(cct, 2) << "all " << num_watchers << " watchers are set, enabling cache" << dendl;
329 _set_enabled(true);
330 }
331}
332
333void RGWSI_Notify::remove_watcher(int i)
334{
335 ldout(cct, 20) << "remove_watcher() i=" << i << dendl;
9f95a23c 336 std::unique_lock l{watchers_lock};
11fdf7f2
TL
337 size_t orig_size = watchers_set.size();
338 watchers_set.erase(i);
339 if (orig_size == (size_t)num_watchers &&
340 watchers_set.size() < orig_size) { /* actually removed */
341 ldout(cct, 2) << "removed watcher, disabling cache" << dendl;
342 _set_enabled(false);
343 }
344}
345
b3b6e05e
TL
346int RGWSI_Notify::watch_cb(const DoutPrefixProvider *dpp,
347 uint64_t notify_id,
11fdf7f2
TL
348 uint64_t cookie,
349 uint64_t notifier_id,
350 bufferlist& bl)
351{
9f95a23c 352 std::shared_lock l{watchers_lock};
11fdf7f2 353 if (cb) {
b3b6e05e 354 return cb->watch_cb(dpp, notify_id, cookie, notifier_id, bl);
11fdf7f2
TL
355 }
356 return 0;
357}
358
359void RGWSI_Notify::set_enabled(bool status)
360{
9f95a23c 361 std::unique_lock l{watchers_lock};
11fdf7f2
TL
362 _set_enabled(status);
363}
364
365void RGWSI_Notify::_set_enabled(bool status)
366{
367 enabled = status;
368 if (cb) {
369 cb->set_enabled(status);
370 }
371}
372
522d829b
TL
373int RGWSI_Notify::distribute(const DoutPrefixProvider *dpp, const string& key,
374 const RGWCacheNotifyInfo& cni,
9f95a23c 375 optional_yield y)
11fdf7f2 376{
adb31ebb
TL
377 /* The RGW uses the control pool to store the watch notify objects.
378 The precedence in RGWSI_Notify::do_start is to call to zone_svc->start and later to init_watch().
379 The first time, RGW starts in the cluster, the RGW will try to create zone and zonegroup system object.
380 In that case RGW will try to distribute the cache before it ran init_watch,
381 which will lead to division by 0 in pick_obj_control (num_watchers is 0).
382 */
383 if (num_watchers > 0) {
384 RGWSI_RADOS::Obj notify_obj = pick_control_obj(key);
385
b3b6e05e 386 ldpp_dout(dpp, 10) << "distributing notification oid=" << notify_obj.get_ref().obj
522d829b
TL
387 << " cni=" << cni << dendl;
388 return robust_notify(dpp, notify_obj, cni, y);
adb31ebb
TL
389 }
390 return 0;
11fdf7f2
TL
391}
392
522d829b
TL
393int RGWSI_Notify::robust_notify(const DoutPrefixProvider *dpp,
394 RGWSI_RADOS::Obj& notify_obj,
395 const RGWCacheNotifyInfo& cni,
9f95a23c 396 optional_yield y)
11fdf7f2 397{
522d829b
TL
398 bufferlist bl;
399 encode(cni, bl);
11fdf7f2
TL
400
401 // First, try to send, without being fancy about it.
522d829b 402 auto r = notify_obj.notify(dpp, bl, 0, nullptr, y);
11fdf7f2 403
11fdf7f2 404 if (r < 0) {
522d829b
TL
405 ldpp_dout(dpp, 1) << __PRETTY_FUNCTION__ << ":" << __LINE__
406 << " Notify failed on object " << cni.obj << ": "
407 << cpp_strerror(-r) << dendl;
408 }
11fdf7f2 409
522d829b
TL
410 // If we timed out, get serious.
411 if (r == -ETIMEDOUT) {
412 RGWCacheNotifyInfo info;
20effc67 413 info.op = INVALIDATE_OBJ;
522d829b
TL
414 info.obj = cni.obj;
415 bufferlist retrybl;
416 encode(info, retrybl);
417
418 for (auto tries = 0u;
419 r == -ETIMEDOUT && tries < max_notify_retries;
420 ++tries) {
421 ldpp_dout(dpp, 1) << __PRETTY_FUNCTION__ << ":" << __LINE__
422 << " Invalidating obj=" << info.obj << " tries="
423 << tries << dendl;
424 r = notify_obj.notify(dpp, bl, 0, nullptr, y);
11fdf7f2 425 if (r < 0) {
522d829b
TL
426 ldpp_dout(dpp, 1) << __PRETTY_FUNCTION__ << ":" << __LINE__
427 << " invalidation attempt " << tries << " failed: "
428 << cpp_strerror(-r) << dendl;
11fdf7f2
TL
429 }
430 }
431 }
432 return r;
433}
434
435void RGWSI_Notify::register_watch_cb(CB *_cb)
436{
9f95a23c 437 std::unique_lock l{watchers_lock};
11fdf7f2
TL
438 cb = _cb;
439 _set_enabled(enabled);
440}
441
442void RGWSI_Notify::schedule_context(Context *c)
443{
444 finisher_svc->schedule_context(c);
445}