]> git.proxmox.com Git - ceph.git/blame - ceph/src/osd/Watch.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / osd / Watch.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2#include "PG.h"
3
4#include "include/types.h"
5#include "messages/MWatchNotify.h"
6
7#include <map>
8
9#include "OSD.h"
10#include "PrimaryLogPG.h"
11#include "Watch.h"
12#include "Session.h"
13
14#include "common/config.h"
15
16struct CancelableContext : public Context {
17 virtual void cancel() = 0;
18};
19
20#define dout_context osd->cct
21#define dout_subsys ceph_subsys_osd
22#undef dout_prefix
23#define dout_prefix _prefix(_dout, this)
24
25static ostream& _prefix(
26 std::ostream* _dout,
27 Notify *notify) {
28 return *_dout << notify->gen_dbg_prefix();
29}
30
31Notify::Notify(
32 ConnectionRef client,
33 uint64_t client_gid,
34 bufferlist &payload,
35 uint32_t timeout,
36 uint64_t cookie,
37 uint64_t notify_id,
38 uint64_t version,
39 OSDService *osd)
40 : client(client), client_gid(client_gid),
41 complete(false),
42 discarded(false),
43 timed_out(false),
44 payload(payload),
45 timeout(timeout),
46 cookie(cookie),
47 notify_id(notify_id),
48 version(version),
49 osd(osd),
50 cb(NULL),
51 lock("Notify::lock") {}
52
53NotifyRef Notify::makeNotifyRef(
54 ConnectionRef client,
55 uint64_t client_gid,
56 bufferlist &payload,
57 uint32_t timeout,
58 uint64_t cookie,
59 uint64_t notify_id,
60 uint64_t version,
61 OSDService *osd) {
62 NotifyRef ret(
63 new Notify(
64 client, client_gid,
65 payload, timeout,
66 cookie, notify_id,
67 version, osd));
68 ret->set_self(ret);
69 return ret;
70}
71
72class NotifyTimeoutCB : public CancelableContext {
73 NotifyRef notif;
74 bool canceled; // protected by notif lock
75public:
76 explicit NotifyTimeoutCB(NotifyRef notif) : notif(notif), canceled(false) {}
77 void finish(int) override {
78 notif->osd->watch_lock.Unlock();
79 notif->lock.Lock();
80 if (!canceled)
81 notif->do_timeout(); // drops lock
82 else
83 notif->lock.Unlock();
84 notif->osd->watch_lock.Lock();
85 }
86 void cancel() override {
87 assert(notif->lock.is_locked_by_me());
88 canceled = true;
89 }
90};
91
92void Notify::do_timeout()
93{
94 assert(lock.is_locked_by_me());
95 dout(10) << "timeout" << dendl;
96 cb = NULL;
97 if (is_discarded()) {
98 lock.Unlock();
99 return;
100 }
101
102 timed_out = true; // we will send the client an error code
103 maybe_complete_notify();
104 assert(complete);
105 set<WatchRef> _watchers;
106 _watchers.swap(watchers);
107 lock.Unlock();
108
109 for (set<WatchRef>::iterator i = _watchers.begin();
110 i != _watchers.end();
111 ++i) {
112 boost::intrusive_ptr<PrimaryLogPG> pg((*i)->get_pg());
113 pg->lock();
114 if (!(*i)->is_discarded()) {
115 (*i)->cancel_notify(self.lock());
116 }
117 pg->unlock();
118 }
119}
120
121void Notify::register_cb()
122{
123 assert(lock.is_locked_by_me());
124 {
125 osd->watch_lock.Lock();
126 cb = new NotifyTimeoutCB(self.lock());
127 osd->watch_timer.add_event_after(
128 timeout,
129 cb);
130 osd->watch_lock.Unlock();
131 }
132}
133
134void Notify::unregister_cb()
135{
136 assert(lock.is_locked_by_me());
137 if (!cb)
138 return;
139 cb->cancel();
140 {
141 osd->watch_lock.Lock();
142 osd->watch_timer.cancel_event(cb);
143 cb = NULL;
144 osd->watch_lock.Unlock();
145 }
146}
147
148void Notify::start_watcher(WatchRef watch)
149{
150 Mutex::Locker l(lock);
151 dout(10) << "start_watcher" << dendl;
152 watchers.insert(watch);
153}
154
155void Notify::complete_watcher(WatchRef watch, bufferlist& reply_bl)
156{
157 Mutex::Locker l(lock);
158 dout(10) << "complete_watcher" << dendl;
159 if (is_discarded())
160 return;
161 assert(watchers.count(watch));
162 watchers.erase(watch);
163 notify_replies.insert(make_pair(make_pair(watch->get_watcher_gid(),
164 watch->get_cookie()),
165 reply_bl));
166 maybe_complete_notify();
167}
168
169void Notify::complete_watcher_remove(WatchRef watch)
170{
171 Mutex::Locker l(lock);
172 dout(10) << __func__ << dendl;
173 if (is_discarded())
174 return;
175 assert(watchers.count(watch));
176 watchers.erase(watch);
177 maybe_complete_notify();
178}
179
180void Notify::maybe_complete_notify()
181{
182 dout(10) << "maybe_complete_notify -- "
183 << watchers.size()
184 << " in progress watchers " << dendl;
185 if (watchers.empty() || timed_out) {
186 // prepare reply
187 bufferlist bl;
188 ::encode(notify_replies, bl);
189 list<pair<uint64_t,uint64_t> > missed;
190 for (set<WatchRef>::iterator p = watchers.begin(); p != watchers.end(); ++p) {
191 missed.push_back(make_pair((*p)->get_watcher_gid(),
192 (*p)->get_cookie()));
193 }
194 ::encode(missed, bl);
195
196 bufferlist empty;
197 MWatchNotify *reply(new MWatchNotify(cookie, version, notify_id,
198 CEPH_WATCH_EVENT_NOTIFY_COMPLETE, empty));
199 reply->notifier_gid = client_gid;
200 reply->set_data(bl);
201 if (timed_out)
202 reply->return_code = -ETIMEDOUT;
203 client->send_message(reply);
204 unregister_cb();
205
206 complete = true;
207 }
208}
209
210void Notify::discard()
211{
212 Mutex::Locker l(lock);
213 discarded = true;
214 unregister_cb();
215 watchers.clear();
216}
217
218void Notify::init()
219{
220 Mutex::Locker l(lock);
221 register_cb();
222 maybe_complete_notify();
223}
224
225#define dout_subsys ceph_subsys_osd
226#undef dout_prefix
227#define dout_prefix _prefix(_dout, watch.get())
228
229static ostream& _prefix(
230 std::ostream* _dout,
231 Watch *watch) {
232 return *_dout << watch->gen_dbg_prefix();
233}
234
235class HandleWatchTimeout : public CancelableContext {
236 WatchRef watch;
237public:
238 bool canceled; // protected by watch->pg->lock
239 explicit HandleWatchTimeout(WatchRef watch) : watch(watch), canceled(false) {}
240 void cancel() override {
241 canceled = true;
242 }
243 void finish(int) override { ceph_abort(); /* not used */ }
244 void complete(int) override {
245 OSDService *osd(watch->osd);
246 ldout(osd->cct, 10) << "HandleWatchTimeout" << dendl;
247 boost::intrusive_ptr<PrimaryLogPG> pg(watch->pg);
248 osd->watch_lock.Unlock();
249 pg->lock();
250 watch->cb = NULL;
251 if (!watch->is_discarded() && !canceled)
252 watch->pg->handle_watch_timeout(watch);
253 delete this; // ~Watch requires pg lock!
254 pg->unlock();
255 osd->watch_lock.Lock();
256 }
257};
258
259class HandleDelayedWatchTimeout : public CancelableContext {
260 WatchRef watch;
261public:
262 bool canceled;
263 explicit HandleDelayedWatchTimeout(WatchRef watch) : watch(watch), canceled(false) {}
264 void cancel() override {
265 canceled = true;
266 }
267 void finish(int) override {
268 OSDService *osd(watch->osd);
269 dout(10) << "HandleWatchTimeoutDelayed" << dendl;
270 assert(watch->pg->is_locked());
271 watch->cb = NULL;
272 if (!watch->is_discarded() && !canceled)
273 watch->pg->handle_watch_timeout(watch);
274 }
275};
276
277#define dout_subsys ceph_subsys_osd
278#undef dout_prefix
279#define dout_prefix _prefix(_dout, this)
280
281string Watch::gen_dbg_prefix() {
282 stringstream ss;
283 ss << pg->gen_prefix() << " -- Watch("
284 << make_pair(cookie, entity) << ") ";
285 return ss.str();
286}
287
288Watch::Watch(
289 PrimaryLogPG *pg,
290 OSDService *osd,
291 ObjectContextRef obc,
292 uint32_t timeout,
293 uint64_t cookie,
294 entity_name_t entity,
295 const entity_addr_t &addr)
296 : cb(NULL),
297 osd(osd),
298 pg(pg),
299 obc(obc),
300 timeout(timeout),
301 cookie(cookie),
302 addr(addr),
303 will_ping(false),
304 entity(entity),
305 discarded(false) {
306 dout(10) << "Watch()" << dendl;
307}
308
309Watch::~Watch() {
310 dout(10) << "~Watch" << dendl;
311 // users must have called remove() or discard() prior to this point
312 assert(!obc);
313 assert(!conn);
314}
315
316bool Watch::connected() { return !!conn; }
317
318Context *Watch::get_delayed_cb()
319{
320 assert(!cb);
321 cb = new HandleDelayedWatchTimeout(self.lock());
322 return cb;
323}
324
325void Watch::register_cb()
326{
327 Mutex::Locker l(osd->watch_lock);
328 if (cb) {
329 dout(15) << "re-registering callback, timeout: " << timeout << dendl;
330 cb->cancel();
331 osd->watch_timer.cancel_event(cb);
332 } else {
333 dout(15) << "registering callback, timeout: " << timeout << dendl;
334 }
335 cb = new HandleWatchTimeout(self.lock());
336 osd->watch_timer.add_event_after(
337 timeout,
338 cb);
339}
340
341void Watch::unregister_cb()
342{
343 dout(15) << "unregister_cb" << dendl;
344 if (!cb)
345 return;
346 dout(15) << "actually registered, cancelling" << dendl;
347 cb->cancel();
348 {
349 Mutex::Locker l(osd->watch_lock);
350 osd->watch_timer.cancel_event(cb); // harmless if not registered with timer
351 }
352 cb = NULL;
353}
354
355void Watch::got_ping(utime_t t)
356{
357 last_ping = t;
358 if (conn) {
359 register_cb();
360 }
361}
362
363void Watch::connect(ConnectionRef con, bool _will_ping)
364{
365 if (conn == con) {
366 dout(10) << __func__ << " con " << con << " - already connected" << dendl;
367 return;
368 }
369 dout(10) << __func__ << " con " << con << dendl;
370 conn = con;
371 will_ping = _will_ping;
372 Session* sessionref(static_cast<Session*>(con->get_priv()));
373 if (sessionref) {
374 sessionref->wstate.addWatch(self.lock());
375 sessionref->put();
376 for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin();
377 i != in_progress_notifies.end();
378 ++i) {
379 send_notify(i->second);
380 }
381 }
382 if (will_ping) {
383 last_ping = ceph_clock_now();
384 register_cb();
385 } else {
386 unregister_cb();
387 }
388}
389
390void Watch::disconnect()
391{
392 dout(10) << "disconnect (con was " << conn << ")" << dendl;
393 conn = ConnectionRef();
394 if (!will_ping)
395 register_cb();
396}
397
398void Watch::discard()
399{
400 dout(10) << "discard" << dendl;
401 for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin();
402 i != in_progress_notifies.end();
403 ++i) {
404 i->second->discard();
405 }
406 discard_state();
407}
408
409void Watch::discard_state()
410{
411 assert(pg->is_locked());
412 assert(!discarded);
413 assert(obc);
414 in_progress_notifies.clear();
415 unregister_cb();
416 discarded = true;
417 if (conn) {
418 Session* sessionref(static_cast<Session*>(conn->get_priv()));
419 if (sessionref) {
420 sessionref->wstate.removeWatch(self.lock());
421 sessionref->put();
422 }
423 conn = ConnectionRef();
424 }
425 obc = ObjectContextRef();
426}
427
428bool Watch::is_discarded() const
429{
430 return discarded;
431}
432
433void Watch::remove(bool send_disconnect)
434{
435 dout(10) << "remove" << dendl;
436 if (send_disconnect && conn) {
437 bufferlist empty;
438 MWatchNotify *reply(new MWatchNotify(cookie, 0, 0,
439 CEPH_WATCH_EVENT_DISCONNECT, empty));
440 conn->send_message(reply);
441 }
442 for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin();
443 i != in_progress_notifies.end();
444 ++i) {
445 i->second->complete_watcher_remove(self.lock());
446 }
447 discard_state();
448}
449
450void Watch::start_notify(NotifyRef notif)
451{
452 assert(in_progress_notifies.find(notif->notify_id) ==
453 in_progress_notifies.end());
454 if (will_ping) {
455 utime_t cutoff = ceph_clock_now();
456 cutoff.sec_ref() -= timeout;
457 if (last_ping < cutoff) {
458 dout(10) << __func__ << " " << notif->notify_id
459 << " last_ping " << last_ping << " < cutoff " << cutoff
460 << ", disconnecting" << dendl;
461 disconnect();
462 return;
463 }
464 }
465 dout(10) << "start_notify " << notif->notify_id << dendl;
466 in_progress_notifies[notif->notify_id] = notif;
467 notif->start_watcher(self.lock());
468 if (connected())
469 send_notify(notif);
470}
471
472void Watch::cancel_notify(NotifyRef notif)
473{
474 dout(10) << "cancel_notify " << notif->notify_id << dendl;
475 in_progress_notifies.erase(notif->notify_id);
476}
477
478void Watch::send_notify(NotifyRef notif)
479{
480 dout(10) << "send_notify" << dendl;
481 MWatchNotify *notify_msg = new MWatchNotify(
482 cookie, notif->version, notif->notify_id,
483 CEPH_WATCH_EVENT_NOTIFY, notif->payload);
484 notify_msg->notifier_gid = notif->client_gid;
485 conn->send_message(notify_msg);
486}
487
488void Watch::notify_ack(uint64_t notify_id, bufferlist& reply_bl)
489{
490 dout(10) << "notify_ack" << dendl;
491 map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.find(notify_id);
492 if (i != in_progress_notifies.end()) {
493 i->second->complete_watcher(self.lock(), reply_bl);
494 in_progress_notifies.erase(i);
495 }
496}
497
498WatchRef Watch::makeWatchRef(
499 PrimaryLogPG *pg, OSDService *osd,
500 ObjectContextRef obc, uint32_t timeout, uint64_t cookie, entity_name_t entity, const entity_addr_t& addr)
501{
502 WatchRef ret(new Watch(pg, osd, obc, timeout, cookie, entity, addr));
503 ret->set_self(ret);
504 return ret;
505}
506
507void WatchConState::addWatch(WatchRef watch)
508{
509 Mutex::Locker l(lock);
510 watches.insert(watch);
511}
512
513void WatchConState::removeWatch(WatchRef watch)
514{
515 Mutex::Locker l(lock);
516 watches.erase(watch);
517}
518
519void WatchConState::reset(Connection *con)
520{
521 set<WatchRef> _watches;
522 {
523 Mutex::Locker l(lock);
524 _watches.swap(watches);
525 }
526 for (set<WatchRef>::iterator i = _watches.begin();
527 i != _watches.end();
528 ++i) {
529 boost::intrusive_ptr<PrimaryLogPG> pg((*i)->get_pg());
530 pg->lock();
531 if (!(*i)->is_discarded()) {
532 if ((*i)->is_connected(con)) {
533 (*i)->disconnect();
534 } else {
535 lgeneric_derr(cct) << __func__ << " not still connected to " << (*i) << dendl;
536 }
537 }
538 pg->unlock();
539 }
540}