1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
4 #include "include/types.h"
5 #include "messages/MWatchNotify.h"
10 #include "PrimaryLogPG.h"
14 #include "common/config.h"
16 #define dout_context osd->cct
17 #define dout_subsys ceph_subsys_osd
19 #define dout_prefix _prefix(_dout, this)
28 using ceph::bufferlist
;
32 struct CancelableContext
: public Context
{
33 virtual void cancel() = 0;
37 static ostream
& _prefix(
40 return notify
->gen_dbg_prefix(*_dout
);
53 client_gid(client_gid
),
65 NotifyRef
Notify::makeNotifyRef(
84 class NotifyTimeoutCB
: public CancelableContext
{
86 bool canceled
; // protected by notif lock
88 explicit NotifyTimeoutCB(NotifyRef notif
) : notif(notif
), canceled(false) {}
89 void finish(int) override
{
90 notif
->osd
->watch_lock
.unlock();
93 notif
->do_timeout(); // drops lock
96 notif
->osd
->watch_lock
.lock();
98 void cancel() override
{
99 ceph_assert(ceph_mutex_is_locked(notif
->lock
));
104 void Notify::do_timeout()
106 ceph_assert(ceph_mutex_is_locked(lock
));
107 dout(10) << "timeout" << dendl
;
109 if (is_discarded()) {
114 timed_out
= true; // we will send the client an error code
115 maybe_complete_notify();
116 ceph_assert(complete
);
117 set
<WatchRef
> _watchers
;
118 _watchers
.swap(watchers
);
121 for (auto i
= _watchers
.begin(); i
!= _watchers
.end(); ++i
) {
122 boost::intrusive_ptr
<PrimaryLogPG
> pg((*i
)->get_pg());
124 if (!(*i
)->is_discarded()) {
125 (*i
)->cancel_notify(self
.lock());
131 void Notify::register_cb()
133 ceph_assert(ceph_mutex_is_locked(lock
));
135 std::lock_guard l
{osd
->watch_lock
};
136 cb
= new NotifyTimeoutCB(self
.lock());
137 if (!osd
->watch_timer
.add_event_after(timeout
, cb
)) {
143 void Notify::unregister_cb()
145 ceph_assert(ceph_mutex_is_locked(lock
));
150 std::lock_guard l
{osd
->watch_lock
};
151 osd
->watch_timer
.cancel_event(cb
);
156 void Notify::start_watcher(WatchRef watch
)
158 std::lock_guard
l(lock
);
159 dout(10) << "start_watcher" << dendl
;
160 watchers
.insert(watch
);
163 void Notify::complete_watcher(WatchRef watch
, bufferlist
& reply_bl
)
165 std::lock_guard
l(lock
);
166 dout(10) << "complete_watcher" << dendl
;
169 ceph_assert(watchers
.count(watch
));
170 watchers
.erase(watch
);
171 notify_replies
.insert(make_pair(make_pair(watch
->get_watcher_gid(),
172 watch
->get_cookie()),
174 maybe_complete_notify();
177 void Notify::complete_watcher_remove(WatchRef watch
)
179 std::lock_guard
l(lock
);
180 dout(10) << __func__
<< dendl
;
183 ceph_assert(watchers
.count(watch
));
184 watchers
.erase(watch
);
185 maybe_complete_notify();
188 void Notify::maybe_complete_notify()
190 dout(10) << "maybe_complete_notify -- "
192 << " in progress watchers " << dendl
;
193 if (watchers
.empty() || timed_out
) {
196 encode(notify_replies
, bl
);
197 vector
<pair
<uint64_t,uint64_t>> missed
;
198 missed
.reserve(watchers
.size());
199 for (auto& watcher
: watchers
) {
200 missed
.emplace_back(watcher
->get_watcher_gid(),
201 watcher
->get_cookie());
206 auto* const reply
= new MWatchNotify(
210 CEPH_WATCH_EVENT_NOTIFY_COMPLETE
,
215 reply
->return_code
= -ETIMEDOUT
;
216 client
->send_message(reply
);
223 void Notify::discard()
225 std::lock_guard
l(lock
);
233 std::lock_guard
l(lock
);
235 maybe_complete_notify();
238 #define dout_subsys ceph_subsys_osd
240 #define dout_prefix _prefix(_dout, watch.get())
242 static ostream
& _prefix(
245 return watch
->gen_dbg_prefix(*_dout
);
248 class HandleWatchTimeout
: public CancelableContext
{
251 bool canceled
; // protected by watch->pg->lock
252 explicit HandleWatchTimeout(WatchRef watch
) : watch(watch
), canceled(false) {}
253 void cancel() override
{
256 void finish(int) override
{ ceph_abort(); /* not used */ }
257 void complete(int) override
{
258 OSDService
*osd(watch
->osd
);
259 ldout(osd
->cct
, 10) << "HandleWatchTimeout" << dendl
;
260 boost::intrusive_ptr
<PrimaryLogPG
> pg(watch
->pg
);
261 osd
->watch_lock
.unlock();
264 if (!watch
->is_discarded() && !canceled
)
265 watch
->pg
->handle_watch_timeout(watch
);
266 delete this; // ~Watch requires pg lock!
268 osd
->watch_lock
.lock();
272 class HandleDelayedWatchTimeout
: public CancelableContext
{
276 explicit HandleDelayedWatchTimeout(WatchRef watch
) : watch(watch
), canceled(false) {}
277 void cancel() override
{
280 void finish(int) override
{
281 OSDService
*osd(watch
->osd
);
282 dout(10) << "HandleWatchTimeoutDelayed" << dendl
;
283 ceph_assert(watch
->pg
->is_locked());
285 if (!watch
->is_discarded() && !canceled
)
286 watch
->pg
->handle_watch_timeout(watch
);
290 #define dout_subsys ceph_subsys_osd
292 #define dout_prefix _prefix(_dout, this)
294 std::ostream
& Watch::gen_dbg_prefix(std::ostream
& out
) {
295 return pg
->gen_prefix(out
) << " -- Watch("
296 << make_pair(cookie
, entity
) << ") ";
302 ObjectContextRef obc
,
305 entity_name_t entity
,
306 const entity_addr_t
&addr
)
317 dout(10) << "Watch()" << dendl
;
321 dout(10) << "~Watch" << dendl
;
322 // users must have called remove() or discard() prior to this point
324 ceph_assert(!is_connected());
327 Context
*Watch::get_delayed_cb()
330 cb
= new HandleDelayedWatchTimeout(self
.lock());
334 void Watch::register_cb()
336 std::lock_guard
l(osd
->watch_lock
);
338 dout(15) << "re-registering callback, timeout: " << timeout
<< dendl
;
340 osd
->watch_timer
.cancel_event(cb
);
342 dout(15) << "registering callback, timeout: " << timeout
<< dendl
;
344 cb
= new HandleWatchTimeout(self
.lock());
345 if (!osd
->watch_timer
.add_event_after(timeout
, cb
)) {
350 void Watch::unregister_cb()
352 dout(15) << "unregister_cb" << dendl
;
355 dout(15) << "actually registered, cancelling" << dendl
;
358 std::lock_guard
l(osd
->watch_lock
);
359 osd
->watch_timer
.cancel_event(cb
); // harmless if not registered with timer
364 void Watch::got_ping(utime_t t
)
367 if (is_connected()) {
372 void Watch::connect(ConnectionRef con
, bool _will_ping
)
374 if (is_connected(con
.get())) {
375 dout(10) << __func__
<< " con " << con
<< " - already connected" << dendl
;
378 dout(10) << __func__
<< " con " << con
<< dendl
;
380 will_ping
= _will_ping
;
381 auto priv
= con
->get_priv();
383 auto sessionref
= static_cast<Session
*>(priv
.get());
384 sessionref
->wstate
.addWatch(self
.lock());
386 for (auto i
= in_progress_notifies
.begin();
387 i
!= in_progress_notifies
.end();
389 send_notify(i
->second
);
393 last_ping
= ceph_clock_now();
400 void Watch::disconnect()
402 dout(10) << "disconnect (con was " << conn
<< ")" << dendl
;
403 conn
= ConnectionRef();
408 void Watch::discard()
410 dout(10) << "discard" << dendl
;
411 for (auto i
= in_progress_notifies
.begin();
412 i
!= in_progress_notifies
.end();
414 i
->second
->discard();
419 void Watch::discard_state()
421 ceph_assert(pg
->is_locked());
422 ceph_assert(!discarded
);
424 in_progress_notifies
.clear();
427 if (is_connected()) {
428 if (auto priv
= conn
->get_priv(); priv
) {
429 auto session
= static_cast<Session
*>(priv
.get());
430 session
->wstate
.removeWatch(self
.lock());
432 conn
= ConnectionRef();
434 obc
= ObjectContextRef();
437 bool Watch::is_discarded() const
442 void Watch::remove(bool send_disconnect
)
444 dout(10) << "remove" << dendl
;
445 if (send_disconnect
&& is_connected()) {
447 MWatchNotify
*reply(new MWatchNotify(cookie
, 0, 0,
448 CEPH_WATCH_EVENT_DISCONNECT
, empty
));
449 conn
->send_message(reply
);
451 for (auto i
= in_progress_notifies
.begin();
452 i
!= in_progress_notifies
.end();
454 i
->second
->complete_watcher_remove(self
.lock());
459 void Watch::start_notify(NotifyRef notif
)
461 ceph_assert(in_progress_notifies
.find(notif
->notify_id
) ==
462 in_progress_notifies
.end());
464 utime_t cutoff
= ceph_clock_now();
465 cutoff
.sec_ref() -= timeout
;
466 if (last_ping
< cutoff
) {
467 dout(10) << __func__
<< " " << notif
->notify_id
468 << " last_ping " << last_ping
<< " < cutoff " << cutoff
469 << ", disconnecting" << dendl
;
474 dout(10) << "start_notify " << notif
->notify_id
<< dendl
;
475 in_progress_notifies
[notif
->notify_id
] = notif
;
476 notif
->start_watcher(self
.lock());
481 void Watch::cancel_notify(NotifyRef notif
)
483 dout(10) << "cancel_notify " << notif
->notify_id
<< dendl
;
484 in_progress_notifies
.erase(notif
->notify_id
);
487 void Watch::send_notify(NotifyRef notif
)
489 dout(10) << "send_notify" << dendl
;
490 MWatchNotify
*notify_msg
= new MWatchNotify(
494 CEPH_WATCH_EVENT_NOTIFY
,
497 conn
->send_message(notify_msg
);
500 void Watch::notify_ack(uint64_t notify_id
, bufferlist
& reply_bl
)
502 dout(10) << "notify_ack" << dendl
;
503 auto i
= in_progress_notifies
.find(notify_id
);
504 if (i
!= in_progress_notifies
.end()) {
505 i
->second
->complete_watcher(self
.lock(), reply_bl
);
506 in_progress_notifies
.erase(i
);
510 WatchRef
Watch::makeWatchRef(
511 PrimaryLogPG
*pg
, OSDService
*osd
,
512 ObjectContextRef obc
, uint32_t timeout
, uint64_t cookie
, entity_name_t entity
, const entity_addr_t
& addr
)
514 WatchRef
ret(new Watch(pg
, osd
, obc
, timeout
, cookie
, entity
, addr
));
519 void WatchConState::addWatch(WatchRef watch
)
521 std::lock_guard
l(lock
);
522 watches
.insert(watch
);
525 void WatchConState::removeWatch(WatchRef watch
)
527 std::lock_guard
l(lock
);
528 watches
.erase(watch
);
531 void WatchConState::reset(Connection
*con
)
533 set
<WatchRef
> _watches
;
535 std::lock_guard
l(lock
);
536 _watches
.swap(watches
);
538 for (set
<WatchRef
>::iterator i
= _watches
.begin();
541 boost::intrusive_ptr
<PrimaryLogPG
> pg((*i
)->get_pg());
543 if (!(*i
)->is_discarded()) {
544 if ((*i
)->is_connected(con
)) {
547 lgeneric_derr(cct
) << __func__
<< " not still connected to " << (*i
) << dendl
;