]> git.proxmox.com Git - ceph.git/blame - ceph/src/osd/Watch.cc
buildsys: switch source download to quincy
[ceph.git] / ceph / src / osd / Watch.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2#include "PG.h"
3
4#include "include/types.h"
5#include "messages/MWatchNotify.h"
6
7#include <map>
8
9#include "OSD.h"
10#include "PrimaryLogPG.h"
11#include "Watch.h"
12#include "Session.h"
13
14#include "common/config.h"
15
7c673cae
FG
16#define dout_context osd->cct
17#define dout_subsys ceph_subsys_osd
18#undef dout_prefix
19#define dout_prefix _prefix(_dout, this)
20
f67539c2
TL
21using std::list;
22using std::make_pair;
23using std::pair;
24using std::ostream;
25using std::set;
26
27using ceph::bufferlist;
28using ceph::decode;
29using ceph::encode;
30
31struct CancelableContext : public Context {
32 virtual void cancel() = 0;
33};
34
35
7c673cae
FG
36static ostream& _prefix(
37 std::ostream* _dout,
38 Notify *notify) {
11fdf7f2 39 return notify->gen_dbg_prefix(*_dout);
7c673cae
FG
40}
41
42Notify::Notify(
43 ConnectionRef client,
44 uint64_t client_gid,
45 bufferlist &payload,
46 uint32_t timeout,
47 uint64_t cookie,
48 uint64_t notify_id,
49 uint64_t version,
50 OSDService *osd)
9f95a23c
TL
51 : client(client),
52 client_gid(client_gid),
7c673cae
FG
53 complete(false),
54 discarded(false),
55 timed_out(false),
56 payload(payload),
57 timeout(timeout),
58 cookie(cookie),
59 notify_id(notify_id),
60 version(version),
61 osd(osd),
9f95a23c 62 cb(nullptr) {}
7c673cae
FG
63
64NotifyRef Notify::makeNotifyRef(
65 ConnectionRef client,
66 uint64_t client_gid,
67 bufferlist &payload,
68 uint32_t timeout,
69 uint64_t cookie,
70 uint64_t notify_id,
71 uint64_t version,
72 OSDService *osd) {
73 NotifyRef ret(
74 new Notify(
75 client, client_gid,
76 payload, timeout,
77 cookie, notify_id,
78 version, osd));
79 ret->set_self(ret);
80 return ret;
81}
82
83class NotifyTimeoutCB : public CancelableContext {
84 NotifyRef notif;
85 bool canceled; // protected by notif lock
86public:
87 explicit NotifyTimeoutCB(NotifyRef notif) : notif(notif), canceled(false) {}
88 void finish(int) override {
9f95a23c
TL
89 notif->osd->watch_lock.unlock();
90 notif->lock.lock();
7c673cae
FG
91 if (!canceled)
92 notif->do_timeout(); // drops lock
93 else
9f95a23c
TL
94 notif->lock.unlock();
95 notif->osd->watch_lock.lock();
7c673cae
FG
96 }
97 void cancel() override {
9f95a23c 98 ceph_assert(ceph_mutex_is_locked(notif->lock));
7c673cae
FG
99 canceled = true;
100 }
101};
102
103void Notify::do_timeout()
104{
9f95a23c 105 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae 106 dout(10) << "timeout" << dendl;
11fdf7f2 107 cb = nullptr;
7c673cae 108 if (is_discarded()) {
9f95a23c 109 lock.unlock();
7c673cae
FG
110 return;
111 }
112
113 timed_out = true; // we will send the client an error code
114 maybe_complete_notify();
11fdf7f2 115 ceph_assert(complete);
7c673cae
FG
116 set<WatchRef> _watchers;
117 _watchers.swap(watchers);
9f95a23c 118 lock.unlock();
7c673cae 119
f67539c2 120 for (auto i = _watchers.begin(); i != _watchers.end(); ++i) {
7c673cae
FG
121 boost::intrusive_ptr<PrimaryLogPG> pg((*i)->get_pg());
122 pg->lock();
123 if (!(*i)->is_discarded()) {
124 (*i)->cancel_notify(self.lock());
125 }
126 pg->unlock();
127 }
128}
129
130void Notify::register_cb()
131{
9f95a23c 132 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae 133 {
9f95a23c 134 std::lock_guard l{osd->watch_lock};
7c673cae 135 cb = new NotifyTimeoutCB(self.lock());
3efd9988
FG
136 if (!osd->watch_timer.add_event_after(timeout, cb)) {
137 cb = nullptr;
138 }
7c673cae
FG
139 }
140}
141
142void Notify::unregister_cb()
143{
9f95a23c 144 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
145 if (!cb)
146 return;
147 cb->cancel();
148 {
9f95a23c 149 std::lock_guard l{osd->watch_lock};
7c673cae 150 osd->watch_timer.cancel_event(cb);
11fdf7f2 151 cb = nullptr;
7c673cae
FG
152 }
153}
154
155void Notify::start_watcher(WatchRef watch)
156{
11fdf7f2 157 std::lock_guard l(lock);
7c673cae
FG
158 dout(10) << "start_watcher" << dendl;
159 watchers.insert(watch);
160}
161
162void Notify::complete_watcher(WatchRef watch, bufferlist& reply_bl)
163{
11fdf7f2 164 std::lock_guard l(lock);
7c673cae
FG
165 dout(10) << "complete_watcher" << dendl;
166 if (is_discarded())
167 return;
11fdf7f2 168 ceph_assert(watchers.count(watch));
7c673cae
FG
169 watchers.erase(watch);
170 notify_replies.insert(make_pair(make_pair(watch->get_watcher_gid(),
171 watch->get_cookie()),
172 reply_bl));
173 maybe_complete_notify();
174}
175
176void Notify::complete_watcher_remove(WatchRef watch)
177{
11fdf7f2 178 std::lock_guard l(lock);
7c673cae
FG
179 dout(10) << __func__ << dendl;
180 if (is_discarded())
181 return;
11fdf7f2 182 ceph_assert(watchers.count(watch));
7c673cae
FG
183 watchers.erase(watch);
184 maybe_complete_notify();
185}
186
187void Notify::maybe_complete_notify()
188{
189 dout(10) << "maybe_complete_notify -- "
190 << watchers.size()
191 << " in progress watchers " << dendl;
192 if (watchers.empty() || timed_out) {
193 // prepare reply
194 bufferlist bl;
11fdf7f2 195 encode(notify_replies, bl);
7c673cae 196 list<pair<uint64_t,uint64_t> > missed;
f67539c2 197 for (auto p = watchers.begin(); p != watchers.end(); ++p) {
7c673cae
FG
198 missed.push_back(make_pair((*p)->get_watcher_gid(),
199 (*p)->get_cookie()));
200 }
11fdf7f2 201 encode(missed, bl);
7c673cae
FG
202
203 bufferlist empty;
9f95a23c
TL
204 auto* const reply = new MWatchNotify(
205 cookie,
206 version,
207 notify_id,
208 CEPH_WATCH_EVENT_NOTIFY_COMPLETE,
209 empty,
210 client_gid);
7c673cae
FG
211 reply->set_data(bl);
212 if (timed_out)
213 reply->return_code = -ETIMEDOUT;
214 client->send_message(reply);
215 unregister_cb();
216
217 complete = true;
218 }
219}
220
221void Notify::discard()
222{
11fdf7f2 223 std::lock_guard l(lock);
7c673cae
FG
224 discarded = true;
225 unregister_cb();
226 watchers.clear();
227}
228
229void Notify::init()
230{
11fdf7f2 231 std::lock_guard l(lock);
7c673cae
FG
232 register_cb();
233 maybe_complete_notify();
234}
235
236#define dout_subsys ceph_subsys_osd
237#undef dout_prefix
238#define dout_prefix _prefix(_dout, watch.get())
239
240static ostream& _prefix(
241 std::ostream* _dout,
242 Watch *watch) {
11fdf7f2 243 return watch->gen_dbg_prefix(*_dout);
7c673cae
FG
244}
245
246class HandleWatchTimeout : public CancelableContext {
247 WatchRef watch;
248public:
249 bool canceled; // protected by watch->pg->lock
250 explicit HandleWatchTimeout(WatchRef watch) : watch(watch), canceled(false) {}
251 void cancel() override {
252 canceled = true;
253 }
254 void finish(int) override { ceph_abort(); /* not used */ }
255 void complete(int) override {
256 OSDService *osd(watch->osd);
257 ldout(osd->cct, 10) << "HandleWatchTimeout" << dendl;
258 boost::intrusive_ptr<PrimaryLogPG> pg(watch->pg);
9f95a23c 259 osd->watch_lock.unlock();
7c673cae 260 pg->lock();
11fdf7f2 261 watch->cb = nullptr;
7c673cae
FG
262 if (!watch->is_discarded() && !canceled)
263 watch->pg->handle_watch_timeout(watch);
264 delete this; // ~Watch requires pg lock!
265 pg->unlock();
9f95a23c 266 osd->watch_lock.lock();
7c673cae
FG
267 }
268};
269
270class HandleDelayedWatchTimeout : public CancelableContext {
271 WatchRef watch;
272public:
273 bool canceled;
274 explicit HandleDelayedWatchTimeout(WatchRef watch) : watch(watch), canceled(false) {}
275 void cancel() override {
276 canceled = true;
277 }
278 void finish(int) override {
279 OSDService *osd(watch->osd);
280 dout(10) << "HandleWatchTimeoutDelayed" << dendl;
11fdf7f2
TL
281 ceph_assert(watch->pg->is_locked());
282 watch->cb = nullptr;
7c673cae
FG
283 if (!watch->is_discarded() && !canceled)
284 watch->pg->handle_watch_timeout(watch);
285 }
286};
287
288#define dout_subsys ceph_subsys_osd
289#undef dout_prefix
290#define dout_prefix _prefix(_dout, this)
291
11fdf7f2
TL
292std::ostream& Watch::gen_dbg_prefix(std::ostream& out) {
293 return pg->gen_prefix(out) << " -- Watch("
294 << make_pair(cookie, entity) << ") ";
7c673cae
FG
295}
296
297Watch::Watch(
298 PrimaryLogPG *pg,
299 OSDService *osd,
300 ObjectContextRef obc,
301 uint32_t timeout,
302 uint64_t cookie,
303 entity_name_t entity,
304 const entity_addr_t &addr)
305 : cb(NULL),
306 osd(osd),
307 pg(pg),
308 obc(obc),
309 timeout(timeout),
310 cookie(cookie),
311 addr(addr),
312 will_ping(false),
313 entity(entity),
314 discarded(false) {
315 dout(10) << "Watch()" << dendl;
316}
317
318Watch::~Watch() {
319 dout(10) << "~Watch" << dendl;
320 // users must have called remove() or discard() prior to this point
11fdf7f2 321 ceph_assert(!obc);
9f95a23c 322 ceph_assert(!is_connected());
7c673cae
FG
323}
324
7c673cae
FG
325Context *Watch::get_delayed_cb()
326{
11fdf7f2 327 ceph_assert(!cb);
7c673cae
FG
328 cb = new HandleDelayedWatchTimeout(self.lock());
329 return cb;
330}
331
332void Watch::register_cb()
333{
11fdf7f2 334 std::lock_guard l(osd->watch_lock);
7c673cae
FG
335 if (cb) {
336 dout(15) << "re-registering callback, timeout: " << timeout << dendl;
337 cb->cancel();
338 osd->watch_timer.cancel_event(cb);
339 } else {
340 dout(15) << "registering callback, timeout: " << timeout << dendl;
341 }
342 cb = new HandleWatchTimeout(self.lock());
3efd9988
FG
343 if (!osd->watch_timer.add_event_after(timeout, cb)) {
344 cb = nullptr;
345 }
7c673cae
FG
346}
347
348void Watch::unregister_cb()
349{
350 dout(15) << "unregister_cb" << dendl;
351 if (!cb)
352 return;
353 dout(15) << "actually registered, cancelling" << dendl;
354 cb->cancel();
355 {
11fdf7f2 356 std::lock_guard l(osd->watch_lock);
7c673cae
FG
357 osd->watch_timer.cancel_event(cb); // harmless if not registered with timer
358 }
11fdf7f2 359 cb = nullptr;
7c673cae
FG
360}
361
362void Watch::got_ping(utime_t t)
363{
364 last_ping = t;
9f95a23c 365 if (is_connected()) {
7c673cae
FG
366 register_cb();
367 }
368}
369
370void Watch::connect(ConnectionRef con, bool _will_ping)
371{
9f95a23c 372 if (is_connected(con.get())) {
7c673cae
FG
373 dout(10) << __func__ << " con " << con << " - already connected" << dendl;
374 return;
375 }
376 dout(10) << __func__ << " con " << con << dendl;
377 conn = con;
378 will_ping = _will_ping;
11fdf7f2
TL
379 auto priv = con->get_priv();
380 if (priv) {
381 auto sessionref = static_cast<Session*>(priv.get());
7c673cae 382 sessionref->wstate.addWatch(self.lock());
11fdf7f2 383 priv.reset();
f67539c2 384 for (auto i = in_progress_notifies.begin();
7c673cae
FG
385 i != in_progress_notifies.end();
386 ++i) {
387 send_notify(i->second);
388 }
389 }
390 if (will_ping) {
391 last_ping = ceph_clock_now();
392 register_cb();
393 } else {
394 unregister_cb();
395 }
396}
397
398void Watch::disconnect()
399{
400 dout(10) << "disconnect (con was " << conn << ")" << dendl;
401 conn = ConnectionRef();
402 if (!will_ping)
403 register_cb();
404}
405
406void Watch::discard()
407{
408 dout(10) << "discard" << dendl;
f67539c2 409 for (auto i = in_progress_notifies.begin();
7c673cae
FG
410 i != in_progress_notifies.end();
411 ++i) {
412 i->second->discard();
413 }
414 discard_state();
415}
416
417void Watch::discard_state()
418{
11fdf7f2
TL
419 ceph_assert(pg->is_locked());
420 ceph_assert(!discarded);
421 ceph_assert(obc);
7c673cae
FG
422 in_progress_notifies.clear();
423 unregister_cb();
424 discarded = true;
9f95a23c 425 if (is_connected()) {
11fdf7f2
TL
426 if (auto priv = conn->get_priv(); priv) {
427 auto session = static_cast<Session*>(priv.get());
428 session->wstate.removeWatch(self.lock());
7c673cae
FG
429 }
430 conn = ConnectionRef();
431 }
432 obc = ObjectContextRef();
433}
434
435bool Watch::is_discarded() const
436{
437 return discarded;
438}
439
440void Watch::remove(bool send_disconnect)
441{
442 dout(10) << "remove" << dendl;
9f95a23c 443 if (send_disconnect && is_connected()) {
7c673cae
FG
444 bufferlist empty;
445 MWatchNotify *reply(new MWatchNotify(cookie, 0, 0,
446 CEPH_WATCH_EVENT_DISCONNECT, empty));
447 conn->send_message(reply);
448 }
f67539c2 449 for (auto i = in_progress_notifies.begin();
7c673cae
FG
450 i != in_progress_notifies.end();
451 ++i) {
452 i->second->complete_watcher_remove(self.lock());
453 }
454 discard_state();
455}
456
457void Watch::start_notify(NotifyRef notif)
458{
11fdf7f2 459 ceph_assert(in_progress_notifies.find(notif->notify_id) ==
7c673cae
FG
460 in_progress_notifies.end());
461 if (will_ping) {
462 utime_t cutoff = ceph_clock_now();
463 cutoff.sec_ref() -= timeout;
464 if (last_ping < cutoff) {
465 dout(10) << __func__ << " " << notif->notify_id
466 << " last_ping " << last_ping << " < cutoff " << cutoff
467 << ", disconnecting" << dendl;
468 disconnect();
469 return;
470 }
471 }
472 dout(10) << "start_notify " << notif->notify_id << dendl;
473 in_progress_notifies[notif->notify_id] = notif;
474 notif->start_watcher(self.lock());
9f95a23c 475 if (is_connected())
7c673cae
FG
476 send_notify(notif);
477}
478
479void Watch::cancel_notify(NotifyRef notif)
480{
481 dout(10) << "cancel_notify " << notif->notify_id << dendl;
482 in_progress_notifies.erase(notif->notify_id);
483}
484
485void Watch::send_notify(NotifyRef notif)
486{
487 dout(10) << "send_notify" << dendl;
488 MWatchNotify *notify_msg = new MWatchNotify(
9f95a23c
TL
489 cookie,
490 notif->version,
491 notif->notify_id,
492 CEPH_WATCH_EVENT_NOTIFY,
493 notif->payload,
494 notif->client_gid);
7c673cae
FG
495 conn->send_message(notify_msg);
496}
497
498void Watch::notify_ack(uint64_t notify_id, bufferlist& reply_bl)
499{
500 dout(10) << "notify_ack" << dendl;
f67539c2 501 auto i = in_progress_notifies.find(notify_id);
7c673cae
FG
502 if (i != in_progress_notifies.end()) {
503 i->second->complete_watcher(self.lock(), reply_bl);
504 in_progress_notifies.erase(i);
505 }
506}
507
508WatchRef Watch::makeWatchRef(
509 PrimaryLogPG *pg, OSDService *osd,
510 ObjectContextRef obc, uint32_t timeout, uint64_t cookie, entity_name_t entity, const entity_addr_t& addr)
511{
512 WatchRef ret(new Watch(pg, osd, obc, timeout, cookie, entity, addr));
513 ret->set_self(ret);
514 return ret;
515}
516
517void WatchConState::addWatch(WatchRef watch)
518{
11fdf7f2 519 std::lock_guard l(lock);
7c673cae
FG
520 watches.insert(watch);
521}
522
523void WatchConState::removeWatch(WatchRef watch)
524{
11fdf7f2 525 std::lock_guard l(lock);
7c673cae
FG
526 watches.erase(watch);
527}
528
529void WatchConState::reset(Connection *con)
530{
531 set<WatchRef> _watches;
532 {
11fdf7f2 533 std::lock_guard l(lock);
7c673cae
FG
534 _watches.swap(watches);
535 }
536 for (set<WatchRef>::iterator i = _watches.begin();
537 i != _watches.end();
538 ++i) {
539 boost::intrusive_ptr<PrimaryLogPG> pg((*i)->get_pg());
540 pg->lock();
541 if (!(*i)->is_discarded()) {
542 if ((*i)->is_connected(con)) {
543 (*i)->disconnect();
544 } else {
545 lgeneric_derr(cct) << __func__ << " not still connected to " << (*i) << dendl;
546 }
547 }
548 pg->unlock();
549 }
550}