1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
4 #include "include/types.h"
5 #include "messages/MWatchNotify.h"
10 #include "PrimaryLogPG.h"
14 #include "common/config.h"
16 struct CancelableContext
: public Context
{
17 virtual void cancel() = 0;
20 #define dout_context osd->cct
21 #define dout_subsys ceph_subsys_osd
23 #define dout_prefix _prefix(_dout, this)
25 static ostream
& _prefix(
28 return *_dout
<< notify
->gen_dbg_prefix();
40 : client(client
), client_gid(client_gid
),
51 lock("Notify::lock") {}
53 NotifyRef
Notify::makeNotifyRef(
72 class NotifyTimeoutCB
: public CancelableContext
{
74 bool canceled
; // protected by notif lock
76 explicit NotifyTimeoutCB(NotifyRef notif
) : notif(notif
), canceled(false) {}
77 void finish(int) override
{
78 notif
->osd
->watch_lock
.Unlock();
81 notif
->do_timeout(); // drops lock
84 notif
->osd
->watch_lock
.Lock();
86 void cancel() override
{
87 assert(notif
->lock
.is_locked_by_me());
92 void Notify::do_timeout()
94 assert(lock
.is_locked_by_me());
95 dout(10) << "timeout" << dendl
;
102 timed_out
= true; // we will send the client an error code
103 maybe_complete_notify();
105 set
<WatchRef
> _watchers
;
106 _watchers
.swap(watchers
);
109 for (set
<WatchRef
>::iterator i
= _watchers
.begin();
110 i
!= _watchers
.end();
112 boost::intrusive_ptr
<PrimaryLogPG
> pg((*i
)->get_pg());
114 if (!(*i
)->is_discarded()) {
115 (*i
)->cancel_notify(self
.lock());
121 void Notify::register_cb()
123 assert(lock
.is_locked_by_me());
125 osd
->watch_lock
.Lock();
126 cb
= new NotifyTimeoutCB(self
.lock());
127 osd
->watch_timer
.add_event_after(
130 osd
->watch_lock
.Unlock();
134 void Notify::unregister_cb()
136 assert(lock
.is_locked_by_me());
141 osd
->watch_lock
.Lock();
142 osd
->watch_timer
.cancel_event(cb
);
144 osd
->watch_lock
.Unlock();
148 void Notify::start_watcher(WatchRef watch
)
150 Mutex::Locker
l(lock
);
151 dout(10) << "start_watcher" << dendl
;
152 watchers
.insert(watch
);
155 void Notify::complete_watcher(WatchRef watch
, bufferlist
& reply_bl
)
157 Mutex::Locker
l(lock
);
158 dout(10) << "complete_watcher" << dendl
;
161 assert(watchers
.count(watch
));
162 watchers
.erase(watch
);
163 notify_replies
.insert(make_pair(make_pair(watch
->get_watcher_gid(),
164 watch
->get_cookie()),
166 maybe_complete_notify();
169 void Notify::complete_watcher_remove(WatchRef watch
)
171 Mutex::Locker
l(lock
);
172 dout(10) << __func__
<< dendl
;
175 assert(watchers
.count(watch
));
176 watchers
.erase(watch
);
177 maybe_complete_notify();
180 void Notify::maybe_complete_notify()
182 dout(10) << "maybe_complete_notify -- "
184 << " in progress watchers " << dendl
;
185 if (watchers
.empty() || timed_out
) {
188 ::encode(notify_replies
, bl
);
189 list
<pair
<uint64_t,uint64_t> > missed
;
190 for (set
<WatchRef
>::iterator p
= watchers
.begin(); p
!= watchers
.end(); ++p
) {
191 missed
.push_back(make_pair((*p
)->get_watcher_gid(),
192 (*p
)->get_cookie()));
194 ::encode(missed
, bl
);
197 MWatchNotify
*reply(new MWatchNotify(cookie
, version
, notify_id
,
198 CEPH_WATCH_EVENT_NOTIFY_COMPLETE
, empty
));
199 reply
->notifier_gid
= client_gid
;
202 reply
->return_code
= -ETIMEDOUT
;
203 client
->send_message(reply
);
210 void Notify::discard()
212 Mutex::Locker
l(lock
);
220 Mutex::Locker
l(lock
);
222 maybe_complete_notify();
225 #define dout_subsys ceph_subsys_osd
227 #define dout_prefix _prefix(_dout, watch.get())
229 static ostream
& _prefix(
232 return *_dout
<< watch
->gen_dbg_prefix();
235 class HandleWatchTimeout
: public CancelableContext
{
238 bool canceled
; // protected by watch->pg->lock
239 explicit HandleWatchTimeout(WatchRef watch
) : watch(watch
), canceled(false) {}
240 void cancel() override
{
243 void finish(int) override
{ ceph_abort(); /* not used */ }
244 void complete(int) override
{
245 OSDService
*osd(watch
->osd
);
246 ldout(osd
->cct
, 10) << "HandleWatchTimeout" << dendl
;
247 boost::intrusive_ptr
<PrimaryLogPG
> pg(watch
->pg
);
248 osd
->watch_lock
.Unlock();
251 if (!watch
->is_discarded() && !canceled
)
252 watch
->pg
->handle_watch_timeout(watch
);
253 delete this; // ~Watch requires pg lock!
255 osd
->watch_lock
.Lock();
259 class HandleDelayedWatchTimeout
: public CancelableContext
{
263 explicit HandleDelayedWatchTimeout(WatchRef watch
) : watch(watch
), canceled(false) {}
264 void cancel() override
{
267 void finish(int) override
{
268 OSDService
*osd(watch
->osd
);
269 dout(10) << "HandleWatchTimeoutDelayed" << dendl
;
270 assert(watch
->pg
->is_locked());
272 if (!watch
->is_discarded() && !canceled
)
273 watch
->pg
->handle_watch_timeout(watch
);
277 #define dout_subsys ceph_subsys_osd
279 #define dout_prefix _prefix(_dout, this)
281 string
Watch::gen_dbg_prefix() {
283 ss
<< pg
->gen_prefix() << " -- Watch("
284 << make_pair(cookie
, entity
) << ") ";
291 ObjectContextRef obc
,
294 entity_name_t entity
,
295 const entity_addr_t
&addr
)
306 dout(10) << "Watch()" << dendl
;
310 dout(10) << "~Watch" << dendl
;
311 // users must have called remove() or discard() prior to this point
316 bool Watch::connected() { return !!conn
; }
318 Context
*Watch::get_delayed_cb()
321 cb
= new HandleDelayedWatchTimeout(self
.lock());
325 void Watch::register_cb()
327 Mutex::Locker
l(osd
->watch_lock
);
329 dout(15) << "re-registering callback, timeout: " << timeout
<< dendl
;
331 osd
->watch_timer
.cancel_event(cb
);
333 dout(15) << "registering callback, timeout: " << timeout
<< dendl
;
335 cb
= new HandleWatchTimeout(self
.lock());
336 osd
->watch_timer
.add_event_after(
341 void Watch::unregister_cb()
343 dout(15) << "unregister_cb" << dendl
;
346 dout(15) << "actually registered, cancelling" << dendl
;
349 Mutex::Locker
l(osd
->watch_lock
);
350 osd
->watch_timer
.cancel_event(cb
); // harmless if not registered with timer
355 void Watch::got_ping(utime_t t
)
363 void Watch::connect(ConnectionRef con
, bool _will_ping
)
366 dout(10) << __func__
<< " con " << con
<< " - already connected" << dendl
;
369 dout(10) << __func__
<< " con " << con
<< dendl
;
371 will_ping
= _will_ping
;
372 Session
* sessionref(static_cast<Session
*>(con
->get_priv()));
374 sessionref
->wstate
.addWatch(self
.lock());
376 for (map
<uint64_t, NotifyRef
>::iterator i
= in_progress_notifies
.begin();
377 i
!= in_progress_notifies
.end();
379 send_notify(i
->second
);
383 last_ping
= ceph_clock_now();
390 void Watch::disconnect()
392 dout(10) << "disconnect (con was " << conn
<< ")" << dendl
;
393 conn
= ConnectionRef();
398 void Watch::discard()
400 dout(10) << "discard" << dendl
;
401 for (map
<uint64_t, NotifyRef
>::iterator i
= in_progress_notifies
.begin();
402 i
!= in_progress_notifies
.end();
404 i
->second
->discard();
409 void Watch::discard_state()
411 assert(pg
->is_locked());
414 in_progress_notifies
.clear();
418 Session
* sessionref(static_cast<Session
*>(conn
->get_priv()));
420 sessionref
->wstate
.removeWatch(self
.lock());
423 conn
= ConnectionRef();
425 obc
= ObjectContextRef();
428 bool Watch::is_discarded() const
433 void Watch::remove(bool send_disconnect
)
435 dout(10) << "remove" << dendl
;
436 if (send_disconnect
&& conn
) {
438 MWatchNotify
*reply(new MWatchNotify(cookie
, 0, 0,
439 CEPH_WATCH_EVENT_DISCONNECT
, empty
));
440 conn
->send_message(reply
);
442 for (map
<uint64_t, NotifyRef
>::iterator i
= in_progress_notifies
.begin();
443 i
!= in_progress_notifies
.end();
445 i
->second
->complete_watcher_remove(self
.lock());
450 void Watch::start_notify(NotifyRef notif
)
452 assert(in_progress_notifies
.find(notif
->notify_id
) ==
453 in_progress_notifies
.end());
455 utime_t cutoff
= ceph_clock_now();
456 cutoff
.sec_ref() -= timeout
;
457 if (last_ping
< cutoff
) {
458 dout(10) << __func__
<< " " << notif
->notify_id
459 << " last_ping " << last_ping
<< " < cutoff " << cutoff
460 << ", disconnecting" << dendl
;
465 dout(10) << "start_notify " << notif
->notify_id
<< dendl
;
466 in_progress_notifies
[notif
->notify_id
] = notif
;
467 notif
->start_watcher(self
.lock());
472 void Watch::cancel_notify(NotifyRef notif
)
474 dout(10) << "cancel_notify " << notif
->notify_id
<< dendl
;
475 in_progress_notifies
.erase(notif
->notify_id
);
478 void Watch::send_notify(NotifyRef notif
)
480 dout(10) << "send_notify" << dendl
;
481 MWatchNotify
*notify_msg
= new MWatchNotify(
482 cookie
, notif
->version
, notif
->notify_id
,
483 CEPH_WATCH_EVENT_NOTIFY
, notif
->payload
);
484 notify_msg
->notifier_gid
= notif
->client_gid
;
485 conn
->send_message(notify_msg
);
488 void Watch::notify_ack(uint64_t notify_id
, bufferlist
& reply_bl
)
490 dout(10) << "notify_ack" << dendl
;
491 map
<uint64_t, NotifyRef
>::iterator i
= in_progress_notifies
.find(notify_id
);
492 if (i
!= in_progress_notifies
.end()) {
493 i
->second
->complete_watcher(self
.lock(), reply_bl
);
494 in_progress_notifies
.erase(i
);
498 WatchRef
Watch::makeWatchRef(
499 PrimaryLogPG
*pg
, OSDService
*osd
,
500 ObjectContextRef obc
, uint32_t timeout
, uint64_t cookie
, entity_name_t entity
, const entity_addr_t
& addr
)
502 WatchRef
ret(new Watch(pg
, osd
, obc
, timeout
, cookie
, entity
, addr
));
507 void WatchConState::addWatch(WatchRef watch
)
509 Mutex::Locker
l(lock
);
510 watches
.insert(watch
);
513 void WatchConState::removeWatch(WatchRef watch
)
515 Mutex::Locker
l(lock
);
516 watches
.erase(watch
);
519 void WatchConState::reset(Connection
*con
)
521 set
<WatchRef
> _watches
;
523 Mutex::Locker
l(lock
);
524 _watches
.swap(watches
);
526 for (set
<WatchRef
>::iterator i
= _watches
.begin();
529 boost::intrusive_ptr
<PrimaryLogPG
> pg((*i
)->get_pg());
531 if (!(*i
)->is_discarded()) {
532 if ((*i
)->is_connected(con
)) {
535 lgeneric_derr(cct
) << __func__
<< " not still connected to " << (*i
) << dendl
;