]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | #include "PG.h" | |
3 | ||
4 | #include "include/types.h" | |
5 | #include "messages/MWatchNotify.h" | |
6 | ||
7 | #include <map> | |
8 | ||
9 | #include "OSD.h" | |
10 | #include "PrimaryLogPG.h" | |
11 | #include "Watch.h" | |
12 | #include "Session.h" | |
13 | ||
14 | #include "common/config.h" | |
15 | ||
7c673cae FG |
16 | #define dout_context osd->cct |
17 | #define dout_subsys ceph_subsys_osd | |
18 | #undef dout_prefix | |
19 | #define dout_prefix _prefix(_dout, this) | |
20 | ||
f67539c2 TL |
21 | using std::list; |
22 | using std::make_pair; | |
23 | using std::pair; | |
24 | using std::ostream; | |
25 | using std::set; | |
26 | ||
27 | using ceph::bufferlist; | |
28 | using ceph::decode; | |
29 | using ceph::encode; | |
30 | ||
31 | struct CancelableContext : public Context { | |
32 | virtual void cancel() = 0; | |
33 | }; | |
34 | ||
35 | ||
7c673cae FG |
36 | static ostream& _prefix( |
37 | std::ostream* _dout, | |
38 | Notify *notify) { | |
11fdf7f2 | 39 | return notify->gen_dbg_prefix(*_dout); |
7c673cae FG |
40 | } |
41 | ||
42 | Notify::Notify( | |
43 | ConnectionRef client, | |
44 | uint64_t client_gid, | |
45 | bufferlist &payload, | |
46 | uint32_t timeout, | |
47 | uint64_t cookie, | |
48 | uint64_t notify_id, | |
49 | uint64_t version, | |
50 | OSDService *osd) | |
9f95a23c TL |
51 | : client(client), |
52 | client_gid(client_gid), | |
7c673cae FG |
53 | complete(false), |
54 | discarded(false), | |
55 | timed_out(false), | |
56 | payload(payload), | |
57 | timeout(timeout), | |
58 | cookie(cookie), | |
59 | notify_id(notify_id), | |
60 | version(version), | |
61 | osd(osd), | |
9f95a23c | 62 | cb(nullptr) {} |
7c673cae FG |
63 | |
64 | NotifyRef Notify::makeNotifyRef( | |
65 | ConnectionRef client, | |
66 | uint64_t client_gid, | |
67 | bufferlist &payload, | |
68 | uint32_t timeout, | |
69 | uint64_t cookie, | |
70 | uint64_t notify_id, | |
71 | uint64_t version, | |
72 | OSDService *osd) { | |
73 | NotifyRef ret( | |
74 | new Notify( | |
75 | client, client_gid, | |
76 | payload, timeout, | |
77 | cookie, notify_id, | |
78 | version, osd)); | |
79 | ret->set_self(ret); | |
80 | return ret; | |
81 | } | |
82 | ||
83 | class NotifyTimeoutCB : public CancelableContext { | |
84 | NotifyRef notif; | |
85 | bool canceled; // protected by notif lock | |
86 | public: | |
87 | explicit NotifyTimeoutCB(NotifyRef notif) : notif(notif), canceled(false) {} | |
88 | void finish(int) override { | |
9f95a23c TL |
89 | notif->osd->watch_lock.unlock(); |
90 | notif->lock.lock(); | |
7c673cae FG |
91 | if (!canceled) |
92 | notif->do_timeout(); // drops lock | |
93 | else | |
9f95a23c TL |
94 | notif->lock.unlock(); |
95 | notif->osd->watch_lock.lock(); | |
7c673cae FG |
96 | } |
97 | void cancel() override { | |
9f95a23c | 98 | ceph_assert(ceph_mutex_is_locked(notif->lock)); |
7c673cae FG |
99 | canceled = true; |
100 | } | |
101 | }; | |
102 | ||
103 | void Notify::do_timeout() | |
104 | { | |
9f95a23c | 105 | ceph_assert(ceph_mutex_is_locked(lock)); |
7c673cae | 106 | dout(10) << "timeout" << dendl; |
11fdf7f2 | 107 | cb = nullptr; |
7c673cae | 108 | if (is_discarded()) { |
9f95a23c | 109 | lock.unlock(); |
7c673cae FG |
110 | return; |
111 | } | |
112 | ||
113 | timed_out = true; // we will send the client an error code | |
114 | maybe_complete_notify(); | |
11fdf7f2 | 115 | ceph_assert(complete); |
7c673cae FG |
116 | set<WatchRef> _watchers; |
117 | _watchers.swap(watchers); | |
9f95a23c | 118 | lock.unlock(); |
7c673cae | 119 | |
f67539c2 | 120 | for (auto i = _watchers.begin(); i != _watchers.end(); ++i) { |
7c673cae FG |
121 | boost::intrusive_ptr<PrimaryLogPG> pg((*i)->get_pg()); |
122 | pg->lock(); | |
123 | if (!(*i)->is_discarded()) { | |
124 | (*i)->cancel_notify(self.lock()); | |
125 | } | |
126 | pg->unlock(); | |
127 | } | |
128 | } | |
129 | ||
130 | void Notify::register_cb() | |
131 | { | |
9f95a23c | 132 | ceph_assert(ceph_mutex_is_locked(lock)); |
7c673cae | 133 | { |
9f95a23c | 134 | std::lock_guard l{osd->watch_lock}; |
7c673cae | 135 | cb = new NotifyTimeoutCB(self.lock()); |
3efd9988 FG |
136 | if (!osd->watch_timer.add_event_after(timeout, cb)) { |
137 | cb = nullptr; | |
138 | } | |
7c673cae FG |
139 | } |
140 | } | |
141 | ||
142 | void Notify::unregister_cb() | |
143 | { | |
9f95a23c | 144 | ceph_assert(ceph_mutex_is_locked(lock)); |
7c673cae FG |
145 | if (!cb) |
146 | return; | |
147 | cb->cancel(); | |
148 | { | |
9f95a23c | 149 | std::lock_guard l{osd->watch_lock}; |
7c673cae | 150 | osd->watch_timer.cancel_event(cb); |
11fdf7f2 | 151 | cb = nullptr; |
7c673cae FG |
152 | } |
153 | } | |
154 | ||
155 | void Notify::start_watcher(WatchRef watch) | |
156 | { | |
11fdf7f2 | 157 | std::lock_guard l(lock); |
7c673cae FG |
158 | dout(10) << "start_watcher" << dendl; |
159 | watchers.insert(watch); | |
160 | } | |
161 | ||
162 | void Notify::complete_watcher(WatchRef watch, bufferlist& reply_bl) | |
163 | { | |
11fdf7f2 | 164 | std::lock_guard l(lock); |
7c673cae FG |
165 | dout(10) << "complete_watcher" << dendl; |
166 | if (is_discarded()) | |
167 | return; | |
11fdf7f2 | 168 | ceph_assert(watchers.count(watch)); |
7c673cae FG |
169 | watchers.erase(watch); |
170 | notify_replies.insert(make_pair(make_pair(watch->get_watcher_gid(), | |
171 | watch->get_cookie()), | |
172 | reply_bl)); | |
173 | maybe_complete_notify(); | |
174 | } | |
175 | ||
176 | void Notify::complete_watcher_remove(WatchRef watch) | |
177 | { | |
11fdf7f2 | 178 | std::lock_guard l(lock); |
7c673cae FG |
179 | dout(10) << __func__ << dendl; |
180 | if (is_discarded()) | |
181 | return; | |
11fdf7f2 | 182 | ceph_assert(watchers.count(watch)); |
7c673cae FG |
183 | watchers.erase(watch); |
184 | maybe_complete_notify(); | |
185 | } | |
186 | ||
187 | void Notify::maybe_complete_notify() | |
188 | { | |
189 | dout(10) << "maybe_complete_notify -- " | |
190 | << watchers.size() | |
191 | << " in progress watchers " << dendl; | |
192 | if (watchers.empty() || timed_out) { | |
193 | // prepare reply | |
194 | bufferlist bl; | |
11fdf7f2 | 195 | encode(notify_replies, bl); |
7c673cae | 196 | list<pair<uint64_t,uint64_t> > missed; |
f67539c2 | 197 | for (auto p = watchers.begin(); p != watchers.end(); ++p) { |
7c673cae FG |
198 | missed.push_back(make_pair((*p)->get_watcher_gid(), |
199 | (*p)->get_cookie())); | |
200 | } | |
11fdf7f2 | 201 | encode(missed, bl); |
7c673cae FG |
202 | |
203 | bufferlist empty; | |
9f95a23c TL |
204 | auto* const reply = new MWatchNotify( |
205 | cookie, | |
206 | version, | |
207 | notify_id, | |
208 | CEPH_WATCH_EVENT_NOTIFY_COMPLETE, | |
209 | empty, | |
210 | client_gid); | |
7c673cae FG |
211 | reply->set_data(bl); |
212 | if (timed_out) | |
213 | reply->return_code = -ETIMEDOUT; | |
214 | client->send_message(reply); | |
215 | unregister_cb(); | |
216 | ||
217 | complete = true; | |
218 | } | |
219 | } | |
220 | ||
221 | void Notify::discard() | |
222 | { | |
11fdf7f2 | 223 | std::lock_guard l(lock); |
7c673cae FG |
224 | discarded = true; |
225 | unregister_cb(); | |
226 | watchers.clear(); | |
227 | } | |
228 | ||
229 | void Notify::init() | |
230 | { | |
11fdf7f2 | 231 | std::lock_guard l(lock); |
7c673cae FG |
232 | register_cb(); |
233 | maybe_complete_notify(); | |
234 | } | |
235 | ||
236 | #define dout_subsys ceph_subsys_osd | |
237 | #undef dout_prefix | |
238 | #define dout_prefix _prefix(_dout, watch.get()) | |
239 | ||
240 | static ostream& _prefix( | |
241 | std::ostream* _dout, | |
242 | Watch *watch) { | |
11fdf7f2 | 243 | return watch->gen_dbg_prefix(*_dout); |
7c673cae FG |
244 | } |
245 | ||
246 | class HandleWatchTimeout : public CancelableContext { | |
247 | WatchRef watch; | |
248 | public: | |
249 | bool canceled; // protected by watch->pg->lock | |
250 | explicit HandleWatchTimeout(WatchRef watch) : watch(watch), canceled(false) {} | |
251 | void cancel() override { | |
252 | canceled = true; | |
253 | } | |
254 | void finish(int) override { ceph_abort(); /* not used */ } | |
255 | void complete(int) override { | |
256 | OSDService *osd(watch->osd); | |
257 | ldout(osd->cct, 10) << "HandleWatchTimeout" << dendl; | |
258 | boost::intrusive_ptr<PrimaryLogPG> pg(watch->pg); | |
9f95a23c | 259 | osd->watch_lock.unlock(); |
7c673cae | 260 | pg->lock(); |
11fdf7f2 | 261 | watch->cb = nullptr; |
7c673cae FG |
262 | if (!watch->is_discarded() && !canceled) |
263 | watch->pg->handle_watch_timeout(watch); | |
264 | delete this; // ~Watch requires pg lock! | |
265 | pg->unlock(); | |
9f95a23c | 266 | osd->watch_lock.lock(); |
7c673cae FG |
267 | } |
268 | }; | |
269 | ||
270 | class HandleDelayedWatchTimeout : public CancelableContext { | |
271 | WatchRef watch; | |
272 | public: | |
273 | bool canceled; | |
274 | explicit HandleDelayedWatchTimeout(WatchRef watch) : watch(watch), canceled(false) {} | |
275 | void cancel() override { | |
276 | canceled = true; | |
277 | } | |
278 | void finish(int) override { | |
279 | OSDService *osd(watch->osd); | |
280 | dout(10) << "HandleWatchTimeoutDelayed" << dendl; | |
11fdf7f2 TL |
281 | ceph_assert(watch->pg->is_locked()); |
282 | watch->cb = nullptr; | |
7c673cae FG |
283 | if (!watch->is_discarded() && !canceled) |
284 | watch->pg->handle_watch_timeout(watch); | |
285 | } | |
286 | }; | |
287 | ||
288 | #define dout_subsys ceph_subsys_osd | |
289 | #undef dout_prefix | |
290 | #define dout_prefix _prefix(_dout, this) | |
291 | ||
11fdf7f2 TL |
292 | std::ostream& Watch::gen_dbg_prefix(std::ostream& out) { |
293 | return pg->gen_prefix(out) << " -- Watch(" | |
294 | << make_pair(cookie, entity) << ") "; | |
7c673cae FG |
295 | } |
296 | ||
297 | Watch::Watch( | |
298 | PrimaryLogPG *pg, | |
299 | OSDService *osd, | |
300 | ObjectContextRef obc, | |
301 | uint32_t timeout, | |
302 | uint64_t cookie, | |
303 | entity_name_t entity, | |
304 | const entity_addr_t &addr) | |
305 | : cb(NULL), | |
306 | osd(osd), | |
307 | pg(pg), | |
308 | obc(obc), | |
309 | timeout(timeout), | |
310 | cookie(cookie), | |
311 | addr(addr), | |
312 | will_ping(false), | |
313 | entity(entity), | |
314 | discarded(false) { | |
315 | dout(10) << "Watch()" << dendl; | |
316 | } | |
317 | ||
318 | Watch::~Watch() { | |
319 | dout(10) << "~Watch" << dendl; | |
320 | // users must have called remove() or discard() prior to this point | |
11fdf7f2 | 321 | ceph_assert(!obc); |
9f95a23c | 322 | ceph_assert(!is_connected()); |
7c673cae FG |
323 | } |
324 | ||
7c673cae FG |
325 | Context *Watch::get_delayed_cb() |
326 | { | |
11fdf7f2 | 327 | ceph_assert(!cb); |
7c673cae FG |
328 | cb = new HandleDelayedWatchTimeout(self.lock()); |
329 | return cb; | |
330 | } | |
331 | ||
332 | void Watch::register_cb() | |
333 | { | |
11fdf7f2 | 334 | std::lock_guard l(osd->watch_lock); |
7c673cae FG |
335 | if (cb) { |
336 | dout(15) << "re-registering callback, timeout: " << timeout << dendl; | |
337 | cb->cancel(); | |
338 | osd->watch_timer.cancel_event(cb); | |
339 | } else { | |
340 | dout(15) << "registering callback, timeout: " << timeout << dendl; | |
341 | } | |
342 | cb = new HandleWatchTimeout(self.lock()); | |
3efd9988 FG |
343 | if (!osd->watch_timer.add_event_after(timeout, cb)) { |
344 | cb = nullptr; | |
345 | } | |
7c673cae FG |
346 | } |
347 | ||
348 | void Watch::unregister_cb() | |
349 | { | |
350 | dout(15) << "unregister_cb" << dendl; | |
351 | if (!cb) | |
352 | return; | |
353 | dout(15) << "actually registered, cancelling" << dendl; | |
354 | cb->cancel(); | |
355 | { | |
11fdf7f2 | 356 | std::lock_guard l(osd->watch_lock); |
7c673cae FG |
357 | osd->watch_timer.cancel_event(cb); // harmless if not registered with timer |
358 | } | |
11fdf7f2 | 359 | cb = nullptr; |
7c673cae FG |
360 | } |
361 | ||
362 | void Watch::got_ping(utime_t t) | |
363 | { | |
364 | last_ping = t; | |
9f95a23c | 365 | if (is_connected()) { |
7c673cae FG |
366 | register_cb(); |
367 | } | |
368 | } | |
369 | ||
370 | void Watch::connect(ConnectionRef con, bool _will_ping) | |
371 | { | |
9f95a23c | 372 | if (is_connected(con.get())) { |
7c673cae FG |
373 | dout(10) << __func__ << " con " << con << " - already connected" << dendl; |
374 | return; | |
375 | } | |
376 | dout(10) << __func__ << " con " << con << dendl; | |
377 | conn = con; | |
378 | will_ping = _will_ping; | |
11fdf7f2 TL |
379 | auto priv = con->get_priv(); |
380 | if (priv) { | |
381 | auto sessionref = static_cast<Session*>(priv.get()); | |
7c673cae | 382 | sessionref->wstate.addWatch(self.lock()); |
11fdf7f2 | 383 | priv.reset(); |
f67539c2 | 384 | for (auto i = in_progress_notifies.begin(); |
7c673cae FG |
385 | i != in_progress_notifies.end(); |
386 | ++i) { | |
387 | send_notify(i->second); | |
388 | } | |
389 | } | |
390 | if (will_ping) { | |
391 | last_ping = ceph_clock_now(); | |
392 | register_cb(); | |
393 | } else { | |
394 | unregister_cb(); | |
395 | } | |
396 | } | |
397 | ||
398 | void Watch::disconnect() | |
399 | { | |
400 | dout(10) << "disconnect (con was " << conn << ")" << dendl; | |
401 | conn = ConnectionRef(); | |
402 | if (!will_ping) | |
403 | register_cb(); | |
404 | } | |
405 | ||
406 | void Watch::discard() | |
407 | { | |
408 | dout(10) << "discard" << dendl; | |
f67539c2 | 409 | for (auto i = in_progress_notifies.begin(); |
7c673cae FG |
410 | i != in_progress_notifies.end(); |
411 | ++i) { | |
412 | i->second->discard(); | |
413 | } | |
414 | discard_state(); | |
415 | } | |
416 | ||
417 | void Watch::discard_state() | |
418 | { | |
11fdf7f2 TL |
419 | ceph_assert(pg->is_locked()); |
420 | ceph_assert(!discarded); | |
421 | ceph_assert(obc); | |
7c673cae FG |
422 | in_progress_notifies.clear(); |
423 | unregister_cb(); | |
424 | discarded = true; | |
9f95a23c | 425 | if (is_connected()) { |
11fdf7f2 TL |
426 | if (auto priv = conn->get_priv(); priv) { |
427 | auto session = static_cast<Session*>(priv.get()); | |
428 | session->wstate.removeWatch(self.lock()); | |
7c673cae FG |
429 | } |
430 | conn = ConnectionRef(); | |
431 | } | |
432 | obc = ObjectContextRef(); | |
433 | } | |
434 | ||
435 | bool Watch::is_discarded() const | |
436 | { | |
437 | return discarded; | |
438 | } | |
439 | ||
440 | void Watch::remove(bool send_disconnect) | |
441 | { | |
442 | dout(10) << "remove" << dendl; | |
9f95a23c | 443 | if (send_disconnect && is_connected()) { |
7c673cae FG |
444 | bufferlist empty; |
445 | MWatchNotify *reply(new MWatchNotify(cookie, 0, 0, | |
446 | CEPH_WATCH_EVENT_DISCONNECT, empty)); | |
447 | conn->send_message(reply); | |
448 | } | |
f67539c2 | 449 | for (auto i = in_progress_notifies.begin(); |
7c673cae FG |
450 | i != in_progress_notifies.end(); |
451 | ++i) { | |
452 | i->second->complete_watcher_remove(self.lock()); | |
453 | } | |
454 | discard_state(); | |
455 | } | |
456 | ||
457 | void Watch::start_notify(NotifyRef notif) | |
458 | { | |
11fdf7f2 | 459 | ceph_assert(in_progress_notifies.find(notif->notify_id) == |
7c673cae FG |
460 | in_progress_notifies.end()); |
461 | if (will_ping) { | |
462 | utime_t cutoff = ceph_clock_now(); | |
463 | cutoff.sec_ref() -= timeout; | |
464 | if (last_ping < cutoff) { | |
465 | dout(10) << __func__ << " " << notif->notify_id | |
466 | << " last_ping " << last_ping << " < cutoff " << cutoff | |
467 | << ", disconnecting" << dendl; | |
468 | disconnect(); | |
469 | return; | |
470 | } | |
471 | } | |
472 | dout(10) << "start_notify " << notif->notify_id << dendl; | |
473 | in_progress_notifies[notif->notify_id] = notif; | |
474 | notif->start_watcher(self.lock()); | |
9f95a23c | 475 | if (is_connected()) |
7c673cae FG |
476 | send_notify(notif); |
477 | } | |
478 | ||
479 | void Watch::cancel_notify(NotifyRef notif) | |
480 | { | |
481 | dout(10) << "cancel_notify " << notif->notify_id << dendl; | |
482 | in_progress_notifies.erase(notif->notify_id); | |
483 | } | |
484 | ||
485 | void Watch::send_notify(NotifyRef notif) | |
486 | { | |
487 | dout(10) << "send_notify" << dendl; | |
488 | MWatchNotify *notify_msg = new MWatchNotify( | |
9f95a23c TL |
489 | cookie, |
490 | notif->version, | |
491 | notif->notify_id, | |
492 | CEPH_WATCH_EVENT_NOTIFY, | |
493 | notif->payload, | |
494 | notif->client_gid); | |
7c673cae FG |
495 | conn->send_message(notify_msg); |
496 | } | |
497 | ||
498 | void Watch::notify_ack(uint64_t notify_id, bufferlist& reply_bl) | |
499 | { | |
500 | dout(10) << "notify_ack" << dendl; | |
f67539c2 | 501 | auto i = in_progress_notifies.find(notify_id); |
7c673cae FG |
502 | if (i != in_progress_notifies.end()) { |
503 | i->second->complete_watcher(self.lock(), reply_bl); | |
504 | in_progress_notifies.erase(i); | |
505 | } | |
506 | } | |
507 | ||
508 | WatchRef Watch::makeWatchRef( | |
509 | PrimaryLogPG *pg, OSDService *osd, | |
510 | ObjectContextRef obc, uint32_t timeout, uint64_t cookie, entity_name_t entity, const entity_addr_t& addr) | |
511 | { | |
512 | WatchRef ret(new Watch(pg, osd, obc, timeout, cookie, entity, addr)); | |
513 | ret->set_self(ret); | |
514 | return ret; | |
515 | } | |
516 | ||
517 | void WatchConState::addWatch(WatchRef watch) | |
518 | { | |
11fdf7f2 | 519 | std::lock_guard l(lock); |
7c673cae FG |
520 | watches.insert(watch); |
521 | } | |
522 | ||
523 | void WatchConState::removeWatch(WatchRef watch) | |
524 | { | |
11fdf7f2 | 525 | std::lock_guard l(lock); |
7c673cae FG |
526 | watches.erase(watch); |
527 | } | |
528 | ||
529 | void WatchConState::reset(Connection *con) | |
530 | { | |
531 | set<WatchRef> _watches; | |
532 | { | |
11fdf7f2 | 533 | std::lock_guard l(lock); |
7c673cae FG |
534 | _watches.swap(watches); |
535 | } | |
536 | for (set<WatchRef>::iterator i = _watches.begin(); | |
537 | i != _watches.end(); | |
538 | ++i) { | |
539 | boost::intrusive_ptr<PrimaryLogPG> pg((*i)->get_pg()); | |
540 | pg->lock(); | |
541 | if (!(*i)->is_discarded()) { | |
542 | if ((*i)->is_connected(con)) { | |
543 | (*i)->disconnect(); | |
544 | } else { | |
545 | lgeneric_derr(cct) << __func__ << " not still connected to " << (*i) << dendl; | |
546 | } | |
547 | } | |
548 | pg->unlock(); | |
549 | } | |
550 | } |