]> git.proxmox.com Git - ceph.git/blob - ceph/src/osd/Watch.cc
import ceph quincy 17.2.1
[ceph.git] / ceph / src / osd / Watch.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 #include "PG.h"
3
4 #include "include/types.h"
5 #include "messages/MWatchNotify.h"
6
7 #include <map>
8
9 #include "OSD.h"
10 #include "PrimaryLogPG.h"
11 #include "Watch.h"
12 #include "Session.h"
13
14 #include "common/config.h"
15
16 #define dout_context osd->cct
17 #define dout_subsys ceph_subsys_osd
18 #undef dout_prefix
19 #define dout_prefix _prefix(_dout, this)
20
21 using std::list;
22 using std::make_pair;
23 using std::pair;
24 using std::ostream;
25 using std::set;
26 using std::vector;
27
28 using ceph::bufferlist;
29 using ceph::decode;
30 using ceph::encode;
31
32 struct CancelableContext : public Context {
33 virtual void cancel() = 0;
34 };
35
36
37 static ostream& _prefix(
38 std::ostream* _dout,
39 Notify *notify) {
40 return notify->gen_dbg_prefix(*_dout);
41 }
42
43 Notify::Notify(
44 ConnectionRef client,
45 uint64_t client_gid,
46 bufferlist &payload,
47 uint32_t timeout,
48 uint64_t cookie,
49 uint64_t notify_id,
50 uint64_t version,
51 OSDService *osd)
52 : client(client),
53 client_gid(client_gid),
54 complete(false),
55 discarded(false),
56 timed_out(false),
57 payload(payload),
58 timeout(timeout),
59 cookie(cookie),
60 notify_id(notify_id),
61 version(version),
62 osd(osd),
63 cb(nullptr) {}
64
65 NotifyRef Notify::makeNotifyRef(
66 ConnectionRef client,
67 uint64_t client_gid,
68 bufferlist &payload,
69 uint32_t timeout,
70 uint64_t cookie,
71 uint64_t notify_id,
72 uint64_t version,
73 OSDService *osd) {
74 NotifyRef ret(
75 new Notify(
76 client, client_gid,
77 payload, timeout,
78 cookie, notify_id,
79 version, osd));
80 ret->set_self(ret);
81 return ret;
82 }
83
84 class NotifyTimeoutCB : public CancelableContext {
85 NotifyRef notif;
86 bool canceled; // protected by notif lock
87 public:
88 explicit NotifyTimeoutCB(NotifyRef notif) : notif(notif), canceled(false) {}
89 void finish(int) override {
90 notif->osd->watch_lock.unlock();
91 notif->lock.lock();
92 if (!canceled)
93 notif->do_timeout(); // drops lock
94 else
95 notif->lock.unlock();
96 notif->osd->watch_lock.lock();
97 }
98 void cancel() override {
99 ceph_assert(ceph_mutex_is_locked(notif->lock));
100 canceled = true;
101 }
102 };
103
104 void Notify::do_timeout()
105 {
106 ceph_assert(ceph_mutex_is_locked(lock));
107 dout(10) << "timeout" << dendl;
108 cb = nullptr;
109 if (is_discarded()) {
110 lock.unlock();
111 return;
112 }
113
114 timed_out = true; // we will send the client an error code
115 maybe_complete_notify();
116 ceph_assert(complete);
117 set<WatchRef> _watchers;
118 _watchers.swap(watchers);
119 lock.unlock();
120
121 for (auto i = _watchers.begin(); i != _watchers.end(); ++i) {
122 boost::intrusive_ptr<PrimaryLogPG> pg((*i)->get_pg());
123 pg->lock();
124 if (!(*i)->is_discarded()) {
125 (*i)->cancel_notify(self.lock());
126 }
127 pg->unlock();
128 }
129 }
130
131 void Notify::register_cb()
132 {
133 ceph_assert(ceph_mutex_is_locked(lock));
134 {
135 std::lock_guard l{osd->watch_lock};
136 cb = new NotifyTimeoutCB(self.lock());
137 if (!osd->watch_timer.add_event_after(timeout, cb)) {
138 cb = nullptr;
139 }
140 }
141 }
142
143 void Notify::unregister_cb()
144 {
145 ceph_assert(ceph_mutex_is_locked(lock));
146 if (!cb)
147 return;
148 cb->cancel();
149 {
150 std::lock_guard l{osd->watch_lock};
151 osd->watch_timer.cancel_event(cb);
152 cb = nullptr;
153 }
154 }
155
156 void Notify::start_watcher(WatchRef watch)
157 {
158 std::lock_guard l(lock);
159 dout(10) << "start_watcher" << dendl;
160 watchers.insert(watch);
161 }
162
163 void Notify::complete_watcher(WatchRef watch, bufferlist& reply_bl)
164 {
165 std::lock_guard l(lock);
166 dout(10) << "complete_watcher" << dendl;
167 if (is_discarded())
168 return;
169 ceph_assert(watchers.count(watch));
170 watchers.erase(watch);
171 notify_replies.insert(make_pair(make_pair(watch->get_watcher_gid(),
172 watch->get_cookie()),
173 reply_bl));
174 maybe_complete_notify();
175 }
176
177 void Notify::complete_watcher_remove(WatchRef watch)
178 {
179 std::lock_guard l(lock);
180 dout(10) << __func__ << dendl;
181 if (is_discarded())
182 return;
183 ceph_assert(watchers.count(watch));
184 watchers.erase(watch);
185 maybe_complete_notify();
186 }
187
188 void Notify::maybe_complete_notify()
189 {
190 dout(10) << "maybe_complete_notify -- "
191 << watchers.size()
192 << " in progress watchers " << dendl;
193 if (watchers.empty() || timed_out) {
194 // prepare reply
195 bufferlist bl;
196 encode(notify_replies, bl);
197 vector<pair<uint64_t,uint64_t>> missed;
198 missed.reserve(watchers.size());
199 for (auto& watcher : watchers) {
200 missed.emplace_back(watcher->get_watcher_gid(),
201 watcher->get_cookie());
202 }
203 encode(missed, bl);
204
205 bufferlist empty;
206 auto* const reply = new MWatchNotify(
207 cookie,
208 version,
209 notify_id,
210 CEPH_WATCH_EVENT_NOTIFY_COMPLETE,
211 empty,
212 client_gid);
213 reply->set_data(bl);
214 if (timed_out)
215 reply->return_code = -ETIMEDOUT;
216 client->send_message(reply);
217 unregister_cb();
218
219 complete = true;
220 }
221 }
222
223 void Notify::discard()
224 {
225 std::lock_guard l(lock);
226 discarded = true;
227 unregister_cb();
228 watchers.clear();
229 }
230
231 void Notify::init()
232 {
233 std::lock_guard l(lock);
234 register_cb();
235 maybe_complete_notify();
236 }
237
238 #define dout_subsys ceph_subsys_osd
239 #undef dout_prefix
240 #define dout_prefix _prefix(_dout, watch.get())
241
242 static ostream& _prefix(
243 std::ostream* _dout,
244 Watch *watch) {
245 return watch->gen_dbg_prefix(*_dout);
246 }
247
248 class HandleWatchTimeout : public CancelableContext {
249 WatchRef watch;
250 public:
251 bool canceled; // protected by watch->pg->lock
252 explicit HandleWatchTimeout(WatchRef watch) : watch(watch), canceled(false) {}
253 void cancel() override {
254 canceled = true;
255 }
256 void finish(int) override { ceph_abort(); /* not used */ }
257 void complete(int) override {
258 OSDService *osd(watch->osd);
259 ldout(osd->cct, 10) << "HandleWatchTimeout" << dendl;
260 boost::intrusive_ptr<PrimaryLogPG> pg(watch->pg);
261 osd->watch_lock.unlock();
262 pg->lock();
263 watch->cb = nullptr;
264 if (!watch->is_discarded() && !canceled)
265 watch->pg->handle_watch_timeout(watch);
266 delete this; // ~Watch requires pg lock!
267 pg->unlock();
268 osd->watch_lock.lock();
269 }
270 };
271
272 class HandleDelayedWatchTimeout : public CancelableContext {
273 WatchRef watch;
274 public:
275 bool canceled;
276 explicit HandleDelayedWatchTimeout(WatchRef watch) : watch(watch), canceled(false) {}
277 void cancel() override {
278 canceled = true;
279 }
280 void finish(int) override {
281 OSDService *osd(watch->osd);
282 dout(10) << "HandleWatchTimeoutDelayed" << dendl;
283 ceph_assert(watch->pg->is_locked());
284 watch->cb = nullptr;
285 if (!watch->is_discarded() && !canceled)
286 watch->pg->handle_watch_timeout(watch);
287 }
288 };
289
290 #define dout_subsys ceph_subsys_osd
291 #undef dout_prefix
292 #define dout_prefix _prefix(_dout, this)
293
294 std::ostream& Watch::gen_dbg_prefix(std::ostream& out) {
295 return pg->gen_prefix(out) << " -- Watch("
296 << make_pair(cookie, entity) << ") ";
297 }
298
299 Watch::Watch(
300 PrimaryLogPG *pg,
301 OSDService *osd,
302 ObjectContextRef obc,
303 uint32_t timeout,
304 uint64_t cookie,
305 entity_name_t entity,
306 const entity_addr_t &addr)
307 : cb(NULL),
308 osd(osd),
309 pg(pg),
310 obc(obc),
311 timeout(timeout),
312 cookie(cookie),
313 addr(addr),
314 will_ping(false),
315 entity(entity),
316 discarded(false) {
317 dout(10) << "Watch()" << dendl;
318 }
319
320 Watch::~Watch() {
321 dout(10) << "~Watch" << dendl;
322 // users must have called remove() or discard() prior to this point
323 ceph_assert(!obc);
324 ceph_assert(!is_connected());
325 }
326
327 Context *Watch::get_delayed_cb()
328 {
329 ceph_assert(!cb);
330 cb = new HandleDelayedWatchTimeout(self.lock());
331 return cb;
332 }
333
334 void Watch::register_cb()
335 {
336 std::lock_guard l(osd->watch_lock);
337 if (cb) {
338 dout(15) << "re-registering callback, timeout: " << timeout << dendl;
339 cb->cancel();
340 osd->watch_timer.cancel_event(cb);
341 } else {
342 dout(15) << "registering callback, timeout: " << timeout << dendl;
343 }
344 cb = new HandleWatchTimeout(self.lock());
345 if (!osd->watch_timer.add_event_after(timeout, cb)) {
346 cb = nullptr;
347 }
348 }
349
350 void Watch::unregister_cb()
351 {
352 dout(15) << "unregister_cb" << dendl;
353 if (!cb)
354 return;
355 dout(15) << "actually registered, cancelling" << dendl;
356 cb->cancel();
357 {
358 std::lock_guard l(osd->watch_lock);
359 osd->watch_timer.cancel_event(cb); // harmless if not registered with timer
360 }
361 cb = nullptr;
362 }
363
364 void Watch::got_ping(utime_t t)
365 {
366 last_ping = t;
367 if (is_connected()) {
368 register_cb();
369 }
370 }
371
372 void Watch::connect(ConnectionRef con, bool _will_ping)
373 {
374 if (is_connected(con.get())) {
375 dout(10) << __func__ << " con " << con << " - already connected" << dendl;
376 return;
377 }
378 dout(10) << __func__ << " con " << con << dendl;
379 conn = con;
380 will_ping = _will_ping;
381 auto priv = con->get_priv();
382 if (priv) {
383 auto sessionref = static_cast<Session*>(priv.get());
384 sessionref->wstate.addWatch(self.lock());
385 priv.reset();
386 for (auto i = in_progress_notifies.begin();
387 i != in_progress_notifies.end();
388 ++i) {
389 send_notify(i->second);
390 }
391 }
392 if (will_ping) {
393 last_ping = ceph_clock_now();
394 register_cb();
395 } else {
396 unregister_cb();
397 }
398 }
399
400 void Watch::disconnect()
401 {
402 dout(10) << "disconnect (con was " << conn << ")" << dendl;
403 conn = ConnectionRef();
404 if (!will_ping)
405 register_cb();
406 }
407
408 void Watch::discard()
409 {
410 dout(10) << "discard" << dendl;
411 for (auto i = in_progress_notifies.begin();
412 i != in_progress_notifies.end();
413 ++i) {
414 i->second->discard();
415 }
416 discard_state();
417 }
418
419 void Watch::discard_state()
420 {
421 ceph_assert(pg->is_locked());
422 ceph_assert(!discarded);
423 ceph_assert(obc);
424 in_progress_notifies.clear();
425 unregister_cb();
426 discarded = true;
427 if (is_connected()) {
428 if (auto priv = conn->get_priv(); priv) {
429 auto session = static_cast<Session*>(priv.get());
430 session->wstate.removeWatch(self.lock());
431 }
432 conn = ConnectionRef();
433 }
434 obc = ObjectContextRef();
435 }
436
437 bool Watch::is_discarded() const
438 {
439 return discarded;
440 }
441
442 void Watch::remove(bool send_disconnect)
443 {
444 dout(10) << "remove" << dendl;
445 if (send_disconnect && is_connected()) {
446 bufferlist empty;
447 MWatchNotify *reply(new MWatchNotify(cookie, 0, 0,
448 CEPH_WATCH_EVENT_DISCONNECT, empty));
449 conn->send_message(reply);
450 }
451 for (auto i = in_progress_notifies.begin();
452 i != in_progress_notifies.end();
453 ++i) {
454 i->second->complete_watcher_remove(self.lock());
455 }
456 discard_state();
457 }
458
459 void Watch::start_notify(NotifyRef notif)
460 {
461 ceph_assert(in_progress_notifies.find(notif->notify_id) ==
462 in_progress_notifies.end());
463 if (will_ping) {
464 utime_t cutoff = ceph_clock_now();
465 cutoff.sec_ref() -= timeout;
466 if (last_ping < cutoff) {
467 dout(10) << __func__ << " " << notif->notify_id
468 << " last_ping " << last_ping << " < cutoff " << cutoff
469 << ", disconnecting" << dendl;
470 disconnect();
471 return;
472 }
473 }
474 dout(10) << "start_notify " << notif->notify_id << dendl;
475 in_progress_notifies[notif->notify_id] = notif;
476 notif->start_watcher(self.lock());
477 if (is_connected())
478 send_notify(notif);
479 }
480
481 void Watch::cancel_notify(NotifyRef notif)
482 {
483 dout(10) << "cancel_notify " << notif->notify_id << dendl;
484 in_progress_notifies.erase(notif->notify_id);
485 }
486
487 void Watch::send_notify(NotifyRef notif)
488 {
489 dout(10) << "send_notify" << dendl;
490 MWatchNotify *notify_msg = new MWatchNotify(
491 cookie,
492 notif->version,
493 notif->notify_id,
494 CEPH_WATCH_EVENT_NOTIFY,
495 notif->payload,
496 notif->client_gid);
497 conn->send_message(notify_msg);
498 }
499
500 void Watch::notify_ack(uint64_t notify_id, bufferlist& reply_bl)
501 {
502 dout(10) << "notify_ack" << dendl;
503 auto i = in_progress_notifies.find(notify_id);
504 if (i != in_progress_notifies.end()) {
505 i->second->complete_watcher(self.lock(), reply_bl);
506 in_progress_notifies.erase(i);
507 }
508 }
509
510 WatchRef Watch::makeWatchRef(
511 PrimaryLogPG *pg, OSDService *osd,
512 ObjectContextRef obc, uint32_t timeout, uint64_t cookie, entity_name_t entity, const entity_addr_t& addr)
513 {
514 WatchRef ret(new Watch(pg, osd, obc, timeout, cookie, entity, addr));
515 ret->set_self(ret);
516 return ret;
517 }
518
519 void WatchConState::addWatch(WatchRef watch)
520 {
521 std::lock_guard l(lock);
522 watches.insert(watch);
523 }
524
525 void WatchConState::removeWatch(WatchRef watch)
526 {
527 std::lock_guard l(lock);
528 watches.erase(watch);
529 }
530
531 void WatchConState::reset(Connection *con)
532 {
533 set<WatchRef> _watches;
534 {
535 std::lock_guard l(lock);
536 _watches.swap(watches);
537 }
538 for (set<WatchRef>::iterator i = _watches.begin();
539 i != _watches.end();
540 ++i) {
541 boost::intrusive_ptr<PrimaryLogPG> pg((*i)->get_pg());
542 pg->lock();
543 if (!(*i)->is_discarded()) {
544 if ((*i)->is_connected(con)) {
545 (*i)->disconnect();
546 } else {
547 lgeneric_derr(cct) << __func__ << " not still connected to " << (*i) << dendl;
548 }
549 }
550 pg->unlock();
551 }
552 }