]> git.proxmox.com Git - ceph.git/blame - ceph/src/osd/Watch.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / osd / Watch.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2#include "PG.h"
3
4#include "include/types.h"
5#include "messages/MWatchNotify.h"
6
7#include <map>
8
9#include "OSD.h"
10#include "PrimaryLogPG.h"
11#include "Watch.h"
12#include "Session.h"
13
14#include "common/config.h"
15
7c673cae
FG
16#define dout_context osd->cct
17#define dout_subsys ceph_subsys_osd
18#undef dout_prefix
19#define dout_prefix _prefix(_dout, this)
20
f67539c2
TL
21using std::list;
22using std::make_pair;
23using std::pair;
24using std::ostream;
25using std::set;
20effc67 26using std::vector;
f67539c2
TL
27
28using ceph::bufferlist;
29using ceph::decode;
30using ceph::encode;
31
32struct CancelableContext : public Context {
33 virtual void cancel() = 0;
34};
35
36
7c673cae
FG
37static ostream& _prefix(
38 std::ostream* _dout,
39 Notify *notify) {
11fdf7f2 40 return notify->gen_dbg_prefix(*_dout);
7c673cae
FG
41}
42
43Notify::Notify(
44 ConnectionRef client,
45 uint64_t client_gid,
46 bufferlist &payload,
47 uint32_t timeout,
48 uint64_t cookie,
49 uint64_t notify_id,
50 uint64_t version,
51 OSDService *osd)
9f95a23c
TL
52 : client(client),
53 client_gid(client_gid),
7c673cae
FG
54 complete(false),
55 discarded(false),
56 timed_out(false),
57 payload(payload),
58 timeout(timeout),
59 cookie(cookie),
60 notify_id(notify_id),
61 version(version),
62 osd(osd),
9f95a23c 63 cb(nullptr) {}
7c673cae
FG
64
65NotifyRef Notify::makeNotifyRef(
66 ConnectionRef client,
67 uint64_t client_gid,
68 bufferlist &payload,
69 uint32_t timeout,
70 uint64_t cookie,
71 uint64_t notify_id,
72 uint64_t version,
73 OSDService *osd) {
74 NotifyRef ret(
75 new Notify(
76 client, client_gid,
77 payload, timeout,
78 cookie, notify_id,
79 version, osd));
80 ret->set_self(ret);
81 return ret;
82}
83
84class NotifyTimeoutCB : public CancelableContext {
85 NotifyRef notif;
86 bool canceled; // protected by notif lock
87public:
88 explicit NotifyTimeoutCB(NotifyRef notif) : notif(notif), canceled(false) {}
89 void finish(int) override {
9f95a23c
TL
90 notif->osd->watch_lock.unlock();
91 notif->lock.lock();
7c673cae
FG
92 if (!canceled)
93 notif->do_timeout(); // drops lock
94 else
9f95a23c
TL
95 notif->lock.unlock();
96 notif->osd->watch_lock.lock();
7c673cae
FG
97 }
98 void cancel() override {
9f95a23c 99 ceph_assert(ceph_mutex_is_locked(notif->lock));
7c673cae
FG
100 canceled = true;
101 }
102};
103
104void Notify::do_timeout()
105{
9f95a23c 106 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae 107 dout(10) << "timeout" << dendl;
11fdf7f2 108 cb = nullptr;
7c673cae 109 if (is_discarded()) {
9f95a23c 110 lock.unlock();
7c673cae
FG
111 return;
112 }
113
114 timed_out = true; // we will send the client an error code
115 maybe_complete_notify();
11fdf7f2 116 ceph_assert(complete);
7c673cae
FG
117 set<WatchRef> _watchers;
118 _watchers.swap(watchers);
9f95a23c 119 lock.unlock();
7c673cae 120
f67539c2 121 for (auto i = _watchers.begin(); i != _watchers.end(); ++i) {
7c673cae
FG
122 boost::intrusive_ptr<PrimaryLogPG> pg((*i)->get_pg());
123 pg->lock();
124 if (!(*i)->is_discarded()) {
125 (*i)->cancel_notify(self.lock());
126 }
127 pg->unlock();
128 }
129}
130
131void Notify::register_cb()
132{
9f95a23c 133 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae 134 {
9f95a23c 135 std::lock_guard l{osd->watch_lock};
7c673cae 136 cb = new NotifyTimeoutCB(self.lock());
3efd9988
FG
137 if (!osd->watch_timer.add_event_after(timeout, cb)) {
138 cb = nullptr;
139 }
7c673cae
FG
140 }
141}
142
143void Notify::unregister_cb()
144{
9f95a23c 145 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
146 if (!cb)
147 return;
148 cb->cancel();
149 {
9f95a23c 150 std::lock_guard l{osd->watch_lock};
7c673cae 151 osd->watch_timer.cancel_event(cb);
11fdf7f2 152 cb = nullptr;
7c673cae
FG
153 }
154}
155
156void Notify::start_watcher(WatchRef watch)
157{
11fdf7f2 158 std::lock_guard l(lock);
7c673cae
FG
159 dout(10) << "start_watcher" << dendl;
160 watchers.insert(watch);
161}
162
163void Notify::complete_watcher(WatchRef watch, bufferlist& reply_bl)
164{
11fdf7f2 165 std::lock_guard l(lock);
7c673cae
FG
166 dout(10) << "complete_watcher" << dendl;
167 if (is_discarded())
168 return;
11fdf7f2 169 ceph_assert(watchers.count(watch));
7c673cae
FG
170 watchers.erase(watch);
171 notify_replies.insert(make_pair(make_pair(watch->get_watcher_gid(),
172 watch->get_cookie()),
173 reply_bl));
174 maybe_complete_notify();
175}
176
177void Notify::complete_watcher_remove(WatchRef watch)
178{
11fdf7f2 179 std::lock_guard l(lock);
7c673cae
FG
180 dout(10) << __func__ << dendl;
181 if (is_discarded())
182 return;
11fdf7f2 183 ceph_assert(watchers.count(watch));
7c673cae
FG
184 watchers.erase(watch);
185 maybe_complete_notify();
186}
187
188void Notify::maybe_complete_notify()
189{
190 dout(10) << "maybe_complete_notify -- "
191 << watchers.size()
192 << " in progress watchers " << dendl;
193 if (watchers.empty() || timed_out) {
194 // prepare reply
195 bufferlist bl;
11fdf7f2 196 encode(notify_replies, bl);
20effc67
TL
197 vector<pair<uint64_t,uint64_t>> missed;
198 missed.reserve(watchers.size());
199 for (auto& watcher : watchers) {
200 missed.emplace_back(watcher->get_watcher_gid(),
201 watcher->get_cookie());
7c673cae 202 }
11fdf7f2 203 encode(missed, bl);
7c673cae
FG
204
205 bufferlist empty;
9f95a23c
TL
206 auto* const reply = new MWatchNotify(
207 cookie,
208 version,
209 notify_id,
210 CEPH_WATCH_EVENT_NOTIFY_COMPLETE,
211 empty,
212 client_gid);
7c673cae
FG
213 reply->set_data(bl);
214 if (timed_out)
215 reply->return_code = -ETIMEDOUT;
216 client->send_message(reply);
217 unregister_cb();
218
219 complete = true;
220 }
221}
222
223void Notify::discard()
224{
11fdf7f2 225 std::lock_guard l(lock);
7c673cae
FG
226 discarded = true;
227 unregister_cb();
228 watchers.clear();
229}
230
231void Notify::init()
232{
11fdf7f2 233 std::lock_guard l(lock);
7c673cae
FG
234 register_cb();
235 maybe_complete_notify();
236}
237
238#define dout_subsys ceph_subsys_osd
239#undef dout_prefix
240#define dout_prefix _prefix(_dout, watch.get())
241
242static ostream& _prefix(
243 std::ostream* _dout,
244 Watch *watch) {
11fdf7f2 245 return watch->gen_dbg_prefix(*_dout);
7c673cae
FG
246}
247
248class HandleWatchTimeout : public CancelableContext {
249 WatchRef watch;
250public:
251 bool canceled; // protected by watch->pg->lock
252 explicit HandleWatchTimeout(WatchRef watch) : watch(watch), canceled(false) {}
253 void cancel() override {
254 canceled = true;
255 }
256 void finish(int) override { ceph_abort(); /* not used */ }
257 void complete(int) override {
258 OSDService *osd(watch->osd);
259 ldout(osd->cct, 10) << "HandleWatchTimeout" << dendl;
260 boost::intrusive_ptr<PrimaryLogPG> pg(watch->pg);
9f95a23c 261 osd->watch_lock.unlock();
7c673cae 262 pg->lock();
11fdf7f2 263 watch->cb = nullptr;
7c673cae
FG
264 if (!watch->is_discarded() && !canceled)
265 watch->pg->handle_watch_timeout(watch);
266 delete this; // ~Watch requires pg lock!
267 pg->unlock();
9f95a23c 268 osd->watch_lock.lock();
7c673cae
FG
269 }
270};
271
272class HandleDelayedWatchTimeout : public CancelableContext {
273 WatchRef watch;
274public:
275 bool canceled;
276 explicit HandleDelayedWatchTimeout(WatchRef watch) : watch(watch), canceled(false) {}
277 void cancel() override {
278 canceled = true;
279 }
280 void finish(int) override {
281 OSDService *osd(watch->osd);
282 dout(10) << "HandleWatchTimeoutDelayed" << dendl;
11fdf7f2
TL
283 ceph_assert(watch->pg->is_locked());
284 watch->cb = nullptr;
7c673cae
FG
285 if (!watch->is_discarded() && !canceled)
286 watch->pg->handle_watch_timeout(watch);
287 }
288};
289
290#define dout_subsys ceph_subsys_osd
291#undef dout_prefix
292#define dout_prefix _prefix(_dout, this)
293
11fdf7f2
TL
294std::ostream& Watch::gen_dbg_prefix(std::ostream& out) {
295 return pg->gen_prefix(out) << " -- Watch("
296 << make_pair(cookie, entity) << ") ";
7c673cae
FG
297}
298
299Watch::Watch(
300 PrimaryLogPG *pg,
301 OSDService *osd,
302 ObjectContextRef obc,
303 uint32_t timeout,
304 uint64_t cookie,
305 entity_name_t entity,
306 const entity_addr_t &addr)
307 : cb(NULL),
308 osd(osd),
309 pg(pg),
310 obc(obc),
311 timeout(timeout),
312 cookie(cookie),
313 addr(addr),
314 will_ping(false),
315 entity(entity),
316 discarded(false) {
317 dout(10) << "Watch()" << dendl;
318}
319
320Watch::~Watch() {
321 dout(10) << "~Watch" << dendl;
322 // users must have called remove() or discard() prior to this point
11fdf7f2 323 ceph_assert(!obc);
9f95a23c 324 ceph_assert(!is_connected());
7c673cae
FG
325}
326
7c673cae
FG
327Context *Watch::get_delayed_cb()
328{
11fdf7f2 329 ceph_assert(!cb);
7c673cae
FG
330 cb = new HandleDelayedWatchTimeout(self.lock());
331 return cb;
332}
333
334void Watch::register_cb()
335{
11fdf7f2 336 std::lock_guard l(osd->watch_lock);
7c673cae
FG
337 if (cb) {
338 dout(15) << "re-registering callback, timeout: " << timeout << dendl;
339 cb->cancel();
340 osd->watch_timer.cancel_event(cb);
341 } else {
342 dout(15) << "registering callback, timeout: " << timeout << dendl;
343 }
344 cb = new HandleWatchTimeout(self.lock());
3efd9988
FG
345 if (!osd->watch_timer.add_event_after(timeout, cb)) {
346 cb = nullptr;
347 }
7c673cae
FG
348}
349
350void Watch::unregister_cb()
351{
352 dout(15) << "unregister_cb" << dendl;
353 if (!cb)
354 return;
355 dout(15) << "actually registered, cancelling" << dendl;
356 cb->cancel();
357 {
11fdf7f2 358 std::lock_guard l(osd->watch_lock);
7c673cae
FG
359 osd->watch_timer.cancel_event(cb); // harmless if not registered with timer
360 }
11fdf7f2 361 cb = nullptr;
7c673cae
FG
362}
363
364void Watch::got_ping(utime_t t)
365{
366 last_ping = t;
9f95a23c 367 if (is_connected()) {
7c673cae
FG
368 register_cb();
369 }
370}
371
372void Watch::connect(ConnectionRef con, bool _will_ping)
373{
9f95a23c 374 if (is_connected(con.get())) {
7c673cae
FG
375 dout(10) << __func__ << " con " << con << " - already connected" << dendl;
376 return;
377 }
378 dout(10) << __func__ << " con " << con << dendl;
379 conn = con;
380 will_ping = _will_ping;
11fdf7f2
TL
381 auto priv = con->get_priv();
382 if (priv) {
383 auto sessionref = static_cast<Session*>(priv.get());
7c673cae 384 sessionref->wstate.addWatch(self.lock());
11fdf7f2 385 priv.reset();
f67539c2 386 for (auto i = in_progress_notifies.begin();
7c673cae
FG
387 i != in_progress_notifies.end();
388 ++i) {
389 send_notify(i->second);
390 }
391 }
392 if (will_ping) {
393 last_ping = ceph_clock_now();
394 register_cb();
395 } else {
396 unregister_cb();
397 }
398}
399
400void Watch::disconnect()
401{
402 dout(10) << "disconnect (con was " << conn << ")" << dendl;
403 conn = ConnectionRef();
404 if (!will_ping)
405 register_cb();
406}
407
408void Watch::discard()
409{
410 dout(10) << "discard" << dendl;
f67539c2 411 for (auto i = in_progress_notifies.begin();
7c673cae
FG
412 i != in_progress_notifies.end();
413 ++i) {
414 i->second->discard();
415 }
416 discard_state();
417}
418
419void Watch::discard_state()
420{
11fdf7f2
TL
421 ceph_assert(pg->is_locked());
422 ceph_assert(!discarded);
423 ceph_assert(obc);
7c673cae
FG
424 in_progress_notifies.clear();
425 unregister_cb();
426 discarded = true;
9f95a23c 427 if (is_connected()) {
11fdf7f2
TL
428 if (auto priv = conn->get_priv(); priv) {
429 auto session = static_cast<Session*>(priv.get());
430 session->wstate.removeWatch(self.lock());
7c673cae
FG
431 }
432 conn = ConnectionRef();
433 }
434 obc = ObjectContextRef();
435}
436
437bool Watch::is_discarded() const
438{
439 return discarded;
440}
441
442void Watch::remove(bool send_disconnect)
443{
444 dout(10) << "remove" << dendl;
9f95a23c 445 if (send_disconnect && is_connected()) {
7c673cae
FG
446 bufferlist empty;
447 MWatchNotify *reply(new MWatchNotify(cookie, 0, 0,
448 CEPH_WATCH_EVENT_DISCONNECT, empty));
449 conn->send_message(reply);
450 }
f67539c2 451 for (auto i = in_progress_notifies.begin();
7c673cae
FG
452 i != in_progress_notifies.end();
453 ++i) {
454 i->second->complete_watcher_remove(self.lock());
455 }
456 discard_state();
457}
458
459void Watch::start_notify(NotifyRef notif)
460{
11fdf7f2 461 ceph_assert(in_progress_notifies.find(notif->notify_id) ==
7c673cae
FG
462 in_progress_notifies.end());
463 if (will_ping) {
464 utime_t cutoff = ceph_clock_now();
465 cutoff.sec_ref() -= timeout;
466 if (last_ping < cutoff) {
467 dout(10) << __func__ << " " << notif->notify_id
468 << " last_ping " << last_ping << " < cutoff " << cutoff
469 << ", disconnecting" << dendl;
470 disconnect();
471 return;
472 }
473 }
474 dout(10) << "start_notify " << notif->notify_id << dendl;
475 in_progress_notifies[notif->notify_id] = notif;
476 notif->start_watcher(self.lock());
9f95a23c 477 if (is_connected())
7c673cae
FG
478 send_notify(notif);
479}
480
481void Watch::cancel_notify(NotifyRef notif)
482{
483 dout(10) << "cancel_notify " << notif->notify_id << dendl;
484 in_progress_notifies.erase(notif->notify_id);
485}
486
487void Watch::send_notify(NotifyRef notif)
488{
489 dout(10) << "send_notify" << dendl;
490 MWatchNotify *notify_msg = new MWatchNotify(
9f95a23c
TL
491 cookie,
492 notif->version,
493 notif->notify_id,
494 CEPH_WATCH_EVENT_NOTIFY,
495 notif->payload,
496 notif->client_gid);
7c673cae
FG
497 conn->send_message(notify_msg);
498}
499
500void Watch::notify_ack(uint64_t notify_id, bufferlist& reply_bl)
501{
502 dout(10) << "notify_ack" << dendl;
f67539c2 503 auto i = in_progress_notifies.find(notify_id);
7c673cae
FG
504 if (i != in_progress_notifies.end()) {
505 i->second->complete_watcher(self.lock(), reply_bl);
506 in_progress_notifies.erase(i);
507 }
508}
509
510WatchRef Watch::makeWatchRef(
511 PrimaryLogPG *pg, OSDService *osd,
512 ObjectContextRef obc, uint32_t timeout, uint64_t cookie, entity_name_t entity, const entity_addr_t& addr)
513{
514 WatchRef ret(new Watch(pg, osd, obc, timeout, cookie, entity, addr));
515 ret->set_self(ret);
516 return ret;
517}
518
519void WatchConState::addWatch(WatchRef watch)
520{
11fdf7f2 521 std::lock_guard l(lock);
7c673cae
FG
522 watches.insert(watch);
523}
524
525void WatchConState::removeWatch(WatchRef watch)
526{
11fdf7f2 527 std::lock_guard l(lock);
7c673cae
FG
528 watches.erase(watch);
529}
530
531void WatchConState::reset(Connection *con)
532{
533 set<WatchRef> _watches;
534 {
11fdf7f2 535 std::lock_guard l(lock);
7c673cae
FG
536 _watches.swap(watches);
537 }
538 for (set<WatchRef>::iterator i = _watches.begin();
539 i != _watches.end();
540 ++i) {
541 boost::intrusive_ptr<PrimaryLogPG> pg((*i)->get_pg());
542 pg->lock();
543 if (!(*i)->is_discarded()) {
544 if ((*i)->is_connected(con)) {
545 (*i)->disconnect();
546 } else {
547 lgeneric_derr(cct) << __func__ << " not still connected to " << (*i) << dendl;
548 }
549 }
550 pg->unlock();
551 }
552}