]> git.proxmox.com Git - ceph.git/blame - ceph/src/osd/Watch.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / osd / Watch.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2#include "PG.h"
3
4#include "include/types.h"
5#include "messages/MWatchNotify.h"
6
7#include <map>
8
9#include "OSD.h"
10#include "PrimaryLogPG.h"
11#include "Watch.h"
12#include "Session.h"
13
14#include "common/config.h"
15
16struct CancelableContext : public Context {
17 virtual void cancel() = 0;
18};
19
20#define dout_context osd->cct
21#define dout_subsys ceph_subsys_osd
22#undef dout_prefix
23#define dout_prefix _prefix(_dout, this)
24
25static ostream& _prefix(
26 std::ostream* _dout,
27 Notify *notify) {
11fdf7f2 28 return notify->gen_dbg_prefix(*_dout);
7c673cae
FG
29}
30
31Notify::Notify(
32 ConnectionRef client,
33 uint64_t client_gid,
34 bufferlist &payload,
35 uint32_t timeout,
36 uint64_t cookie,
37 uint64_t notify_id,
38 uint64_t version,
39 OSDService *osd)
40 : client(client), client_gid(client_gid),
41 complete(false),
42 discarded(false),
43 timed_out(false),
44 payload(payload),
45 timeout(timeout),
46 cookie(cookie),
47 notify_id(notify_id),
48 version(version),
49 osd(osd),
50 cb(NULL),
51 lock("Notify::lock") {}
52
53NotifyRef Notify::makeNotifyRef(
54 ConnectionRef client,
55 uint64_t client_gid,
56 bufferlist &payload,
57 uint32_t timeout,
58 uint64_t cookie,
59 uint64_t notify_id,
60 uint64_t version,
61 OSDService *osd) {
62 NotifyRef ret(
63 new Notify(
64 client, client_gid,
65 payload, timeout,
66 cookie, notify_id,
67 version, osd));
68 ret->set_self(ret);
69 return ret;
70}
71
72class NotifyTimeoutCB : public CancelableContext {
73 NotifyRef notif;
74 bool canceled; // protected by notif lock
75public:
76 explicit NotifyTimeoutCB(NotifyRef notif) : notif(notif), canceled(false) {}
77 void finish(int) override {
78 notif->osd->watch_lock.Unlock();
79 notif->lock.Lock();
80 if (!canceled)
81 notif->do_timeout(); // drops lock
82 else
83 notif->lock.Unlock();
84 notif->osd->watch_lock.Lock();
85 }
86 void cancel() override {
11fdf7f2 87 ceph_assert(notif->lock.is_locked_by_me());
7c673cae
FG
88 canceled = true;
89 }
90};
91
92void Notify::do_timeout()
93{
11fdf7f2 94 ceph_assert(lock.is_locked_by_me());
7c673cae 95 dout(10) << "timeout" << dendl;
11fdf7f2 96 cb = nullptr;
7c673cae
FG
97 if (is_discarded()) {
98 lock.Unlock();
99 return;
100 }
101
102 timed_out = true; // we will send the client an error code
103 maybe_complete_notify();
11fdf7f2 104 ceph_assert(complete);
7c673cae
FG
105 set<WatchRef> _watchers;
106 _watchers.swap(watchers);
107 lock.Unlock();
108
109 for (set<WatchRef>::iterator i = _watchers.begin();
110 i != _watchers.end();
111 ++i) {
112 boost::intrusive_ptr<PrimaryLogPG> pg((*i)->get_pg());
113 pg->lock();
114 if (!(*i)->is_discarded()) {
115 (*i)->cancel_notify(self.lock());
116 }
117 pg->unlock();
118 }
119}
120
121void Notify::register_cb()
122{
11fdf7f2 123 ceph_assert(lock.is_locked_by_me());
7c673cae
FG
124 {
125 osd->watch_lock.Lock();
126 cb = new NotifyTimeoutCB(self.lock());
3efd9988
FG
127 if (!osd->watch_timer.add_event_after(timeout, cb)) {
128 cb = nullptr;
129 }
7c673cae
FG
130 osd->watch_lock.Unlock();
131 }
132}
133
134void Notify::unregister_cb()
135{
11fdf7f2 136 ceph_assert(lock.is_locked_by_me());
7c673cae
FG
137 if (!cb)
138 return;
139 cb->cancel();
140 {
141 osd->watch_lock.Lock();
142 osd->watch_timer.cancel_event(cb);
11fdf7f2 143 cb = nullptr;
7c673cae
FG
144 osd->watch_lock.Unlock();
145 }
146}
147
148void Notify::start_watcher(WatchRef watch)
149{
11fdf7f2 150 std::lock_guard l(lock);
7c673cae
FG
151 dout(10) << "start_watcher" << dendl;
152 watchers.insert(watch);
153}
154
155void Notify::complete_watcher(WatchRef watch, bufferlist& reply_bl)
156{
11fdf7f2 157 std::lock_guard l(lock);
7c673cae
FG
158 dout(10) << "complete_watcher" << dendl;
159 if (is_discarded())
160 return;
11fdf7f2 161 ceph_assert(watchers.count(watch));
7c673cae
FG
162 watchers.erase(watch);
163 notify_replies.insert(make_pair(make_pair(watch->get_watcher_gid(),
164 watch->get_cookie()),
165 reply_bl));
166 maybe_complete_notify();
167}
168
169void Notify::complete_watcher_remove(WatchRef watch)
170{
11fdf7f2 171 std::lock_guard l(lock);
7c673cae
FG
172 dout(10) << __func__ << dendl;
173 if (is_discarded())
174 return;
11fdf7f2 175 ceph_assert(watchers.count(watch));
7c673cae
FG
176 watchers.erase(watch);
177 maybe_complete_notify();
178}
179
180void Notify::maybe_complete_notify()
181{
182 dout(10) << "maybe_complete_notify -- "
183 << watchers.size()
184 << " in progress watchers " << dendl;
185 if (watchers.empty() || timed_out) {
186 // prepare reply
187 bufferlist bl;
11fdf7f2 188 encode(notify_replies, bl);
7c673cae
FG
189 list<pair<uint64_t,uint64_t> > missed;
190 for (set<WatchRef>::iterator p = watchers.begin(); p != watchers.end(); ++p) {
191 missed.push_back(make_pair((*p)->get_watcher_gid(),
192 (*p)->get_cookie()));
193 }
11fdf7f2 194 encode(missed, bl);
7c673cae
FG
195
196 bufferlist empty;
197 MWatchNotify *reply(new MWatchNotify(cookie, version, notify_id,
198 CEPH_WATCH_EVENT_NOTIFY_COMPLETE, empty));
199 reply->notifier_gid = client_gid;
200 reply->set_data(bl);
201 if (timed_out)
202 reply->return_code = -ETIMEDOUT;
203 client->send_message(reply);
204 unregister_cb();
205
206 complete = true;
207 }
208}
209
210void Notify::discard()
211{
11fdf7f2 212 std::lock_guard l(lock);
7c673cae
FG
213 discarded = true;
214 unregister_cb();
215 watchers.clear();
216}
217
218void Notify::init()
219{
11fdf7f2 220 std::lock_guard l(lock);
7c673cae
FG
221 register_cb();
222 maybe_complete_notify();
223}
224
225#define dout_subsys ceph_subsys_osd
226#undef dout_prefix
227#define dout_prefix _prefix(_dout, watch.get())
228
229static ostream& _prefix(
230 std::ostream* _dout,
231 Watch *watch) {
11fdf7f2 232 return watch->gen_dbg_prefix(*_dout);
7c673cae
FG
233}
234
235class HandleWatchTimeout : public CancelableContext {
236 WatchRef watch;
237public:
238 bool canceled; // protected by watch->pg->lock
239 explicit HandleWatchTimeout(WatchRef watch) : watch(watch), canceled(false) {}
240 void cancel() override {
241 canceled = true;
242 }
243 void finish(int) override { ceph_abort(); /* not used */ }
244 void complete(int) override {
245 OSDService *osd(watch->osd);
246 ldout(osd->cct, 10) << "HandleWatchTimeout" << dendl;
247 boost::intrusive_ptr<PrimaryLogPG> pg(watch->pg);
248 osd->watch_lock.Unlock();
249 pg->lock();
11fdf7f2 250 watch->cb = nullptr;
7c673cae
FG
251 if (!watch->is_discarded() && !canceled)
252 watch->pg->handle_watch_timeout(watch);
253 delete this; // ~Watch requires pg lock!
254 pg->unlock();
255 osd->watch_lock.Lock();
256 }
257};
258
259class HandleDelayedWatchTimeout : public CancelableContext {
260 WatchRef watch;
261public:
262 bool canceled;
263 explicit HandleDelayedWatchTimeout(WatchRef watch) : watch(watch), canceled(false) {}
264 void cancel() override {
265 canceled = true;
266 }
267 void finish(int) override {
268 OSDService *osd(watch->osd);
269 dout(10) << "HandleWatchTimeoutDelayed" << dendl;
11fdf7f2
TL
270 ceph_assert(watch->pg->is_locked());
271 watch->cb = nullptr;
7c673cae
FG
272 if (!watch->is_discarded() && !canceled)
273 watch->pg->handle_watch_timeout(watch);
274 }
275};
276
277#define dout_subsys ceph_subsys_osd
278#undef dout_prefix
279#define dout_prefix _prefix(_dout, this)
280
11fdf7f2
TL
281std::ostream& Watch::gen_dbg_prefix(std::ostream& out) {
282 return pg->gen_prefix(out) << " -- Watch("
283 << make_pair(cookie, entity) << ") ";
7c673cae
FG
284}
285
286Watch::Watch(
287 PrimaryLogPG *pg,
288 OSDService *osd,
289 ObjectContextRef obc,
290 uint32_t timeout,
291 uint64_t cookie,
292 entity_name_t entity,
293 const entity_addr_t &addr)
294 : cb(NULL),
295 osd(osd),
296 pg(pg),
297 obc(obc),
298 timeout(timeout),
299 cookie(cookie),
300 addr(addr),
301 will_ping(false),
302 entity(entity),
303 discarded(false) {
304 dout(10) << "Watch()" << dendl;
305}
306
307Watch::~Watch() {
308 dout(10) << "~Watch" << dendl;
309 // users must have called remove() or discard() prior to this point
11fdf7f2
TL
310 ceph_assert(!obc);
311 ceph_assert(!conn);
7c673cae
FG
312}
313
314bool Watch::connected() { return !!conn; }
315
316Context *Watch::get_delayed_cb()
317{
11fdf7f2 318 ceph_assert(!cb);
7c673cae
FG
319 cb = new HandleDelayedWatchTimeout(self.lock());
320 return cb;
321}
322
323void Watch::register_cb()
324{
11fdf7f2 325 std::lock_guard l(osd->watch_lock);
7c673cae
FG
326 if (cb) {
327 dout(15) << "re-registering callback, timeout: " << timeout << dendl;
328 cb->cancel();
329 osd->watch_timer.cancel_event(cb);
330 } else {
331 dout(15) << "registering callback, timeout: " << timeout << dendl;
332 }
333 cb = new HandleWatchTimeout(self.lock());
3efd9988
FG
334 if (!osd->watch_timer.add_event_after(timeout, cb)) {
335 cb = nullptr;
336 }
7c673cae
FG
337}
338
339void Watch::unregister_cb()
340{
341 dout(15) << "unregister_cb" << dendl;
342 if (!cb)
343 return;
344 dout(15) << "actually registered, cancelling" << dendl;
345 cb->cancel();
346 {
11fdf7f2 347 std::lock_guard l(osd->watch_lock);
7c673cae
FG
348 osd->watch_timer.cancel_event(cb); // harmless if not registered with timer
349 }
11fdf7f2 350 cb = nullptr;
7c673cae
FG
351}
352
353void Watch::got_ping(utime_t t)
354{
355 last_ping = t;
356 if (conn) {
357 register_cb();
358 }
359}
360
361void Watch::connect(ConnectionRef con, bool _will_ping)
362{
363 if (conn == con) {
364 dout(10) << __func__ << " con " << con << " - already connected" << dendl;
365 return;
366 }
367 dout(10) << __func__ << " con " << con << dendl;
368 conn = con;
369 will_ping = _will_ping;
11fdf7f2
TL
370 auto priv = con->get_priv();
371 if (priv) {
372 auto sessionref = static_cast<Session*>(priv.get());
7c673cae 373 sessionref->wstate.addWatch(self.lock());
11fdf7f2 374 priv.reset();
7c673cae
FG
375 for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin();
376 i != in_progress_notifies.end();
377 ++i) {
378 send_notify(i->second);
379 }
380 }
381 if (will_ping) {
382 last_ping = ceph_clock_now();
383 register_cb();
384 } else {
385 unregister_cb();
386 }
387}
388
389void Watch::disconnect()
390{
391 dout(10) << "disconnect (con was " << conn << ")" << dendl;
392 conn = ConnectionRef();
393 if (!will_ping)
394 register_cb();
395}
396
397void Watch::discard()
398{
399 dout(10) << "discard" << dendl;
400 for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin();
401 i != in_progress_notifies.end();
402 ++i) {
403 i->second->discard();
404 }
405 discard_state();
406}
407
408void Watch::discard_state()
409{
11fdf7f2
TL
410 ceph_assert(pg->is_locked());
411 ceph_assert(!discarded);
412 ceph_assert(obc);
7c673cae
FG
413 in_progress_notifies.clear();
414 unregister_cb();
415 discarded = true;
416 if (conn) {
11fdf7f2
TL
417 if (auto priv = conn->get_priv(); priv) {
418 auto session = static_cast<Session*>(priv.get());
419 session->wstate.removeWatch(self.lock());
7c673cae
FG
420 }
421 conn = ConnectionRef();
422 }
423 obc = ObjectContextRef();
424}
425
426bool Watch::is_discarded() const
427{
428 return discarded;
429}
430
431void Watch::remove(bool send_disconnect)
432{
433 dout(10) << "remove" << dendl;
434 if (send_disconnect && conn) {
435 bufferlist empty;
436 MWatchNotify *reply(new MWatchNotify(cookie, 0, 0,
437 CEPH_WATCH_EVENT_DISCONNECT, empty));
438 conn->send_message(reply);
439 }
440 for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin();
441 i != in_progress_notifies.end();
442 ++i) {
443 i->second->complete_watcher_remove(self.lock());
444 }
445 discard_state();
446}
447
448void Watch::start_notify(NotifyRef notif)
449{
11fdf7f2 450 ceph_assert(in_progress_notifies.find(notif->notify_id) ==
7c673cae
FG
451 in_progress_notifies.end());
452 if (will_ping) {
453 utime_t cutoff = ceph_clock_now();
454 cutoff.sec_ref() -= timeout;
455 if (last_ping < cutoff) {
456 dout(10) << __func__ << " " << notif->notify_id
457 << " last_ping " << last_ping << " < cutoff " << cutoff
458 << ", disconnecting" << dendl;
459 disconnect();
460 return;
461 }
462 }
463 dout(10) << "start_notify " << notif->notify_id << dendl;
464 in_progress_notifies[notif->notify_id] = notif;
465 notif->start_watcher(self.lock());
466 if (connected())
467 send_notify(notif);
468}
469
470void Watch::cancel_notify(NotifyRef notif)
471{
472 dout(10) << "cancel_notify " << notif->notify_id << dendl;
473 in_progress_notifies.erase(notif->notify_id);
474}
475
476void Watch::send_notify(NotifyRef notif)
477{
478 dout(10) << "send_notify" << dendl;
479 MWatchNotify *notify_msg = new MWatchNotify(
480 cookie, notif->version, notif->notify_id,
481 CEPH_WATCH_EVENT_NOTIFY, notif->payload);
482 notify_msg->notifier_gid = notif->client_gid;
483 conn->send_message(notify_msg);
484}
485
486void Watch::notify_ack(uint64_t notify_id, bufferlist& reply_bl)
487{
488 dout(10) << "notify_ack" << dendl;
489 map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.find(notify_id);
490 if (i != in_progress_notifies.end()) {
491 i->second->complete_watcher(self.lock(), reply_bl);
492 in_progress_notifies.erase(i);
493 }
494}
495
496WatchRef Watch::makeWatchRef(
497 PrimaryLogPG *pg, OSDService *osd,
498 ObjectContextRef obc, uint32_t timeout, uint64_t cookie, entity_name_t entity, const entity_addr_t& addr)
499{
500 WatchRef ret(new Watch(pg, osd, obc, timeout, cookie, entity, addr));
501 ret->set_self(ret);
502 return ret;
503}
504
505void WatchConState::addWatch(WatchRef watch)
506{
11fdf7f2 507 std::lock_guard l(lock);
7c673cae
FG
508 watches.insert(watch);
509}
510
511void WatchConState::removeWatch(WatchRef watch)
512{
11fdf7f2 513 std::lock_guard l(lock);
7c673cae
FG
514 watches.erase(watch);
515}
516
517void WatchConState::reset(Connection *con)
518{
519 set<WatchRef> _watches;
520 {
11fdf7f2 521 std::lock_guard l(lock);
7c673cae
FG
522 _watches.swap(watches);
523 }
524 for (set<WatchRef>::iterator i = _watches.begin();
525 i != _watches.end();
526 ++i) {
527 boost::intrusive_ptr<PrimaryLogPG> pg((*i)->get_pg());
528 pg->lock();
529 if (!(*i)->is_discarded()) {
530 if ((*i)->is_connected(con)) {
531 (*i)->disconnect();
532 } else {
533 lgeneric_derr(cct) << __func__ << " not still connected to " << (*i) << dendl;
534 }
535 }
536 pg->unlock();
537 }
538}